blob: 4b04ee57ba8c405a93ab911e8f7e44f8c8915b7a [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
Kent Hagerman2b216042020-04-03 18:28:56 -040017package remote
khenaidoob9203542018-09-17 22:56:37 -040018
19import (
20 "context"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080021 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
22 "github.com/opencord/voltha-lib-go/v3/pkg/log"
23 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
24 "github.com/opencord/voltha-protos/v3/go/openflow_13"
25 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040026)
27
npujar1d86a522019-11-14 17:11:16 +053028// AdapterProxy represents adapter proxy attributes
khenaidoob9203542018-09-17 22:56:37 -040029type AdapterProxy struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040030 kafka.EndpointManager
khenaidoo54e0ddf2019-02-27 16:21:33 -050031 deviceTopicRegistered bool
Kent Hagermana6d0c362019-07-30 12:50:21 -040032 corePairTopic string
npujar467fe752020-01-16 20:17:45 +053033 kafkaICProxy kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040034}
35
npujar1d86a522019-11-14 17:11:16 +053036// NewAdapterProxy will return adapter proxy instance
Matteo Scandolod525ae32020-04-02 17:27:29 -070037func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
Kent Hagermana6d0c362019-07-30 12:50:21 -040038 return &AdapterProxy{
Kent Hagerman2b216042020-04-03 18:28:56 -040039 EndpointManager: endpointManager,
Kent Hagermana6d0c362019-07-30 12:50:21 -040040 kafkaICProxy: kafkaProxy,
41 corePairTopic: corePairTopic,
42 deviceTopicRegistered: false,
43 }
khenaidoob9203542018-09-17 22:56:37 -040044}
45
serkant.uluderya334479d2019-04-10 08:26:15 -070046func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
Kent Hagermana6d0c362019-07-30 12:50:21 -040047 return kafka.Topic{Name: ap.corePairTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050048}
49
Matteo Scandolod525ae32020-04-02 17:27:29 -070050func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
51
Kent Hagerman2b216042020-04-03 18:28:56 -040052 endpoint, err := ap.GetEndpoint(deviceID, adapterType)
Matteo Scandolod525ae32020-04-02 17:27:29 -070053 if err != nil {
54 return nil, err
55 }
56
57 return &kafka.Topic{Name: string(endpoint)}, nil
khenaidoo54e0ddf2019-02-27 16:21:33 -050058}
59
khenaidoo442e7c72020-03-10 16:13:48 -040060func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
61 waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
62
63 // Sent the request to kafka
64 respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
65
66 // Wait for first response which would indicate whether the request was successfully sent to kafka.
67 firstResponse, ok := <-respChnl
68 if !ok || firstResponse.MType != kafka.RpcSent {
Girish Kumarf56a4682020-03-20 20:07:46 +000069 logger.Errorw("failure to request to kafka", log.Fields{"rpc": rpc, "device-id": deviceID, "error": firstResponse.Err})
khenaidoo442e7c72020-03-10 16:13:48 -040070 return nil, firstResponse.Err
71 }
72 // return the kafka channel for the caller to wait for the response of the RPC call
73 return respChnl, nil
74}
75
Kent Hagerman2b216042020-04-03 18:28:56 -040076// AdoptDevice invokes adopt device rpc
77func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
78 logger.Debugw("AdoptDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040079 rpc := "adopt_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -070080 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
81 if err != nil {
82 return nil, err
83 }
khenaidoo442e7c72020-03-10 16:13:48 -040084 args := []*kafka.KVArg{
85 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -040086 }
khenaidoo54e0ddf2019-02-27 16:21:33 -050087 replyToTopic := ap.getCoreTopic()
khenaidoo54e0ddf2019-02-27 16:21:33 -050088 ap.deviceTopicRegistered = true
Matteo Scandolod525ae32020-04-02 17:27:29 -070089 logger.Debugw("adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
90 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -040091}
92
Kent Hagerman2b216042020-04-03 18:28:56 -040093// DisableDevice invokes disable device rpc
94func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
95 logger.Debugw("DisableDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040096 rpc := "disable_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -070097 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
98 if err != nil {
99 return nil, err
100 }
khenaidoo442e7c72020-03-10 16:13:48 -0400101 args := []*kafka.KVArg{
102 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -0400103 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500104 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700105 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400106}
107
Kent Hagerman2b216042020-04-03 18:28:56 -0400108// ReEnableDevice invokes reenable device rpc
109func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
110 logger.Debugw("ReEnableDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400111 rpc := "reenable_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700112 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
113 if err != nil {
114 return nil, err
115 }
khenaidoo442e7c72020-03-10 16:13:48 -0400116 args := []*kafka.KVArg{
117 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400118 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500119 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700120 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400121}
122
Kent Hagerman2b216042020-04-03 18:28:56 -0400123// RebootDevice invokes reboot device rpc
124func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
125 logger.Debugw("RebootDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400126 rpc := "reboot_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700127 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
128 if err != nil {
129 return nil, err
130 }
khenaidoo442e7c72020-03-10 16:13:48 -0400131 args := []*kafka.KVArg{
132 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400133 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500134 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700135 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400136}
137
Kent Hagerman2b216042020-04-03 18:28:56 -0400138// DeleteDevice invokes delete device rpc
139func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
140 logger.Debugw("DeleteDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400141 rpc := "delete_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700142 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
143 if err != nil {
144 return nil, err
145 }
khenaidoo442e7c72020-03-10 16:13:48 -0400146 args := []*kafka.KVArg{
147 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400148 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500149 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700150 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400151}
152
Kent Hagerman2b216042020-04-03 18:28:56 -0400153// GetOfpDeviceInfo invokes get ofp device info rpc
154func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
155 logger.Debugw("GetOfpDeviceInfo", log.Fields{"device-id": device.Id})
khenaidoo442e7c72020-03-10 16:13:48 -0400156 rpc := "get_ofp_device_info"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700157 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
158 if err != nil {
159 return nil, err
160 }
khenaidoo442e7c72020-03-10 16:13:48 -0400161 args := []*kafka.KVArg{
162 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400163 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500164 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700165 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400166}
167
Kent Hagerman2b216042020-04-03 18:28:56 -0400168// GetOfpPortInfo invokes get ofp port info rpc
169func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
170 logger.Debugw("GetOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700171 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
172 if err != nil {
173 return nil, err
174 }
khenaidoo442e7c72020-03-10 16:13:48 -0400175 args := []*kafka.KVArg{
176 {Key: "device", Value: device},
177 {Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
khenaidoo4d4802d2018-10-04 21:59:49 -0400178 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500179 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700180 return ap.sendRPC(ctx, "get_ofp_port_info", toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400181}
182
Kent Hagerman2b216042020-04-03 18:28:56 -0400183// ReconcileDevice invokes reconcile device rpc
184func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
185 logger.Debugw("ReconcileDevice", log.Fields{"device-id": device.Id})
Matt Jeanneret7cf8e0b2020-01-09 11:57:51 -0500186 rpc := "reconcile_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700187 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
188 if err != nil {
189 return nil, err
190 }
khenaidooba6b6c42019-08-02 09:11:56 -0400191 args := []*kafka.KVArg{
192 {Key: "device", Value: device},
193 }
khenaidooba6b6c42019-08-02 09:11:56 -0400194 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700195 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400196}
197
Kent Hagerman2b216042020-04-03 18:28:56 -0400198// DownloadImage invokes download image rpc
199func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
200 logger.Debugw("DownloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500201 rpc := "download_image"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700202 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
203 if err != nil {
204 return nil, err
205 }
khenaidoo442e7c72020-03-10 16:13:48 -0400206 args := []*kafka.KVArg{
207 {Key: "device", Value: device},
208 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500209 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500210 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700211 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400212}
213
Kent Hagerman2b216042020-04-03 18:28:56 -0400214// GetImageDownloadStatus invokes get image download status rpc
215func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
216 logger.Debugw("GetImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500217 rpc := "get_image_download_status"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700218 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
219 if err != nil {
220 return nil, err
221 }
khenaidoo442e7c72020-03-10 16:13:48 -0400222 args := []*kafka.KVArg{
223 {Key: "device", Value: device},
224 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500225 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500226 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700227 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400228}
229
Kent Hagerman2b216042020-04-03 18:28:56 -0400230// CancelImageDownload invokes cancel image download rpc
231func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
232 logger.Debugw("CancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500233 rpc := "cancel_image_download"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700234 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
235 if err != nil {
236 return nil, err
237 }
khenaidoo442e7c72020-03-10 16:13:48 -0400238 args := []*kafka.KVArg{
239 {Key: "device", Value: device},
240 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500241 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500242 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700243 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400244}
245
Kent Hagerman2b216042020-04-03 18:28:56 -0400246// ActivateImageUpdate invokes activate image update rpc
247func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
248 logger.Debugw("ActivateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500249 rpc := "activate_image_update"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700250 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
251 if err != nil {
252 return nil, err
253 }
khenaidoo442e7c72020-03-10 16:13:48 -0400254 args := []*kafka.KVArg{
255 {Key: "device", Value: device},
256 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500257 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500258 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700259 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400260}
261
Kent Hagerman2b216042020-04-03 18:28:56 -0400262// RevertImageUpdate invokes revert image update rpc
263func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
264 logger.Debugw("RevertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500265 rpc := "revert_image_update"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700266 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
267 if err != nil {
268 return nil, err
269 }
khenaidoo442e7c72020-03-10 16:13:48 -0400270 args := []*kafka.KVArg{
271 {Key: "device", Value: device},
272 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500273 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500274 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700275 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400276}
277
Kent Hagerman2b216042020-04-03 18:28:56 -0400278func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
279 logger.Debugw("PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700280 toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
281 if err != nil {
282 return nil, err
283 }
khenaidoofdbad6e2018-11-06 22:26:38 -0500284 rpc := "receive_packet_out"
khenaidoo442e7c72020-03-10 16:13:48 -0400285 args := []*kafka.KVArg{
286 {Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
287 {Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
288 {Key: "packet", Value: packet},
khenaidoofdbad6e2018-11-06 22:26:38 -0500289 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500290 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700291 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500292}
293
Kent Hagerman2b216042020-04-03 18:28:56 -0400294// UpdateFlowsBulk invokes update flows bulk rpc
295func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
296 logger.Debugw("UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700297 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
298 if err != nil {
299 return nil, err
300 }
khenaidoo19d7b632018-10-30 10:49:50 -0400301 rpc := "update_flows_bulk"
khenaidoo442e7c72020-03-10 16:13:48 -0400302 args := []*kafka.KVArg{
303 {Key: "device", Value: device},
304 {Key: "flows", Value: flows},
305 {Key: "groups", Value: groups},
306 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400307 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500308 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700309 return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400310}
311
Kent Hagerman2b216042020-04-03 18:28:56 -0400312// UpdateFlowsIncremental invokes update flows incremental rpc
313func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
314 logger.Debugw("UpdateFlowsIncremental",
khenaidoo0458db62019-06-20 08:50:36 -0400315 log.Fields{
khenaidoo442e7c72020-03-10 16:13:48 -0400316 "device-id": device.Id,
317 "flow-to-add-count": len(flowChanges.ToAdd.Items),
318 "flow-to-delete-count": len(flowChanges.ToRemove.Items),
319 "group-to-add-count": len(groupChanges.ToAdd.Items),
320 "group-to-delete-count": len(groupChanges.ToRemove.Items),
321 "group-to-update-count": len(groupChanges.ToUpdate.Items),
khenaidoo0458db62019-06-20 08:50:36 -0400322 })
Matteo Scandolod525ae32020-04-02 17:27:29 -0700323 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
324 if err != nil {
325 return nil, err
326 }
Matt Jeanneretb0037422019-03-23 14:36:51 -0400327 rpc := "update_flows_incrementally"
khenaidoo442e7c72020-03-10 16:13:48 -0400328 args := []*kafka.KVArg{
329 {Key: "device", Value: device},
330 {Key: "flow_changes", Value: flowChanges},
331 {Key: "group_changes", Value: groupChanges},
332 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400333 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500334 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700335 return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400336}
337
Kent Hagerman2b216042020-04-03 18:28:56 -0400338// UpdatePmConfigs invokes update pm configs rpc
339func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
340 logger.Debugw("UpdatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700341 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
342 if err != nil {
343 return nil, err
344 }
khenaidoob3127472019-07-24 21:04:55 -0400345 rpc := "Update_pm_config"
khenaidoo442e7c72020-03-10 16:13:48 -0400346 args := []*kafka.KVArg{
347 {Key: "device", Value: device},
348 {Key: "pm_configs", Value: pmConfigs},
khenaidoob3127472019-07-24 21:04:55 -0400349 }
khenaidoob3127472019-07-24 21:04:55 -0400350 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700351 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400352}
353
Kent Hagerman2b216042020-04-03 18:28:56 -0400354// SimulateAlarm invokes simulate alarm rpc
355func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
356 logger.Debugw("SimulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
serkant.uluderya334479d2019-04-10 08:26:15 -0700357 rpc := "simulate_alarm"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700358 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
359 if err != nil {
360 return nil, err
361 }
khenaidoo442e7c72020-03-10 16:13:48 -0400362 args := []*kafka.KVArg{
363 {Key: "device", Value: device},
364 {Key: "request", Value: simulateReq},
serkant.uluderya334479d2019-04-10 08:26:15 -0700365 }
serkant.uluderya334479d2019-04-10 08:26:15 -0700366 replyToTopic := ap.getCoreTopic()
367 ap.deviceTopicRegistered = true
Matteo Scandolod525ae32020-04-02 17:27:29 -0700368 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
serkant.uluderya334479d2019-04-10 08:26:15 -0700369}
kesavandbc2d1622020-01-21 00:42:01 -0500370
Kent Hagerman2b216042020-04-03 18:28:56 -0400371func (ap *AdapterProxy) DisablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
372 logger.Debugw("DisablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500373 rpc := "disable_port"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700374 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
375 if err != nil {
376 return nil, err
377 }
khenaidoo442e7c72020-03-10 16:13:48 -0400378 args := []*kafka.KVArg{
379 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
380 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500381 }
kesavandbc2d1622020-01-21 00:42:01 -0500382 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700383 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500384}
385
Kent Hagerman2b216042020-04-03 18:28:56 -0400386func (ap *AdapterProxy) EnablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
387 logger.Debugw("EnablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500388 rpc := "enable_port"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700389 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
390 if err != nil {
391 return nil, err
392 }
khenaidoo442e7c72020-03-10 16:13:48 -0400393 args := []*kafka.KVArg{
394 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
395 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500396 }
kesavandbc2d1622020-01-21 00:42:01 -0500397 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700398 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500399}
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500400
Kent Hagerman2b216042020-04-03 18:28:56 -0400401// ChildDeviceLost invokes child device_lost rpc
402func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
403 logger.Debugw("ChildDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500404 rpc := "child_device_lost"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700405 toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
406 if err != nil {
407 return nil, err
408 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500409 args := []*kafka.KVArg{
Matteo Scandolod525ae32020-04-02 17:27:29 -0700410 {Key: "pDeviceId", Value: &ic.StrType{Val: deviceID}},
khenaidoo442e7c72020-03-10 16:13:48 -0400411 {Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
412 {Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
413 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500414 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700415 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500416}
onkarkundargi87285252020-01-27 11:34:52 +0530417
Kent Hagerman2b216042020-04-03 18:28:56 -0400418func (ap *AdapterProxy) StartOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
Girish Kumarf56a4682020-03-20 20:07:46 +0000419 logger.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
onkarkundargi87285252020-01-27 11:34:52 +0530420 rpc := "start_omci_test"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700421 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
422 if err != nil {
423 return nil, err
424 }
onkarkundargi87285252020-01-27 11:34:52 +0530425 // Use a device specific topic as we are the only core handling requests for this device
426 replyToTopic := ap.getCoreTopic()
Scott Baker432f9be2020-03-26 11:56:30 -0700427 // TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
428 // than including the whole request, which is (deviceid, uuid)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700429 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
onkarkundargi87285252020-01-27 11:34:52 +0530430 &kafka.KVArg{Key: "device", Value: device},
431 &kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
432}