blob: 4cbb36338435b06dd16abe615c715086fd6ba72e [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
Kent Hagerman2b216042020-04-03 18:28:56 -040017package remote
khenaidoob9203542018-09-17 22:56:37 -040018
19import (
20 "context"
serkant.uluderya8ff291d2020-05-20 00:58:00 -070021
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
23 "github.com/opencord/voltha-lib-go/v3/pkg/log"
24 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
25 "github.com/opencord/voltha-protos/v3/go/openflow_13"
26 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040027)
28
npujar1d86a522019-11-14 17:11:16 +053029// AdapterProxy represents adapter proxy attributes
khenaidoob9203542018-09-17 22:56:37 -040030type AdapterProxy struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040031 kafka.EndpointManager
khenaidoo54e0ddf2019-02-27 16:21:33 -050032 deviceTopicRegistered bool
serkant.uluderya8ff291d2020-05-20 00:58:00 -070033 coreTopic string
npujar467fe752020-01-16 20:17:45 +053034 kafkaICProxy kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040035}
36
npujar1d86a522019-11-14 17:11:16 +053037// NewAdapterProxy will return adapter proxy instance
serkant.uluderya8ff291d2020-05-20 00:58:00 -070038func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, coreTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
Kent Hagermana6d0c362019-07-30 12:50:21 -040039 return &AdapterProxy{
Kent Hagerman2b216042020-04-03 18:28:56 -040040 EndpointManager: endpointManager,
Kent Hagermana6d0c362019-07-30 12:50:21 -040041 kafkaICProxy: kafkaProxy,
serkant.uluderya8ff291d2020-05-20 00:58:00 -070042 coreTopic: coreTopic,
Kent Hagermana6d0c362019-07-30 12:50:21 -040043 deviceTopicRegistered: false,
44 }
khenaidoob9203542018-09-17 22:56:37 -040045}
46
serkant.uluderya334479d2019-04-10 08:26:15 -070047func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
serkant.uluderya8ff291d2020-05-20 00:58:00 -070048 return kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050049}
50
Matteo Scandolod525ae32020-04-02 17:27:29 -070051func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
52
Kent Hagerman2b216042020-04-03 18:28:56 -040053 endpoint, err := ap.GetEndpoint(deviceID, adapterType)
Matteo Scandolod525ae32020-04-02 17:27:29 -070054 if err != nil {
55 return nil, err
56 }
57
58 return &kafka.Topic{Name: string(endpoint)}, nil
khenaidoo54e0ddf2019-02-27 16:21:33 -050059}
60
khenaidoo442e7c72020-03-10 16:13:48 -040061func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
62 waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
63
64 // Sent the request to kafka
65 respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
66
67 // Wait for first response which would indicate whether the request was successfully sent to kafka.
68 firstResponse, ok := <-respChnl
69 if !ok || firstResponse.MType != kafka.RpcSent {
Girish Kumarf56a4682020-03-20 20:07:46 +000070 logger.Errorw("failure to request to kafka", log.Fields{"rpc": rpc, "device-id": deviceID, "error": firstResponse.Err})
khenaidoo442e7c72020-03-10 16:13:48 -040071 return nil, firstResponse.Err
72 }
73 // return the kafka channel for the caller to wait for the response of the RPC call
74 return respChnl, nil
75}
76
Kent Hagerman2b216042020-04-03 18:28:56 -040077// AdoptDevice invokes adopt device rpc
78func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
79 logger.Debugw("AdoptDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040080 rpc := "adopt_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -070081 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
82 if err != nil {
83 return nil, err
84 }
khenaidoo442e7c72020-03-10 16:13:48 -040085 args := []*kafka.KVArg{
86 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -040087 }
khenaidoo54e0ddf2019-02-27 16:21:33 -050088 replyToTopic := ap.getCoreTopic()
khenaidoo54e0ddf2019-02-27 16:21:33 -050089 ap.deviceTopicRegistered = true
Matteo Scandolod525ae32020-04-02 17:27:29 -070090 logger.Debugw("adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
91 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -040092}
93
Kent Hagerman2b216042020-04-03 18:28:56 -040094// DisableDevice invokes disable device rpc
95func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
96 logger.Debugw("DisableDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040097 rpc := "disable_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -070098 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
99 if err != nil {
100 return nil, err
101 }
khenaidoo442e7c72020-03-10 16:13:48 -0400102 args := []*kafka.KVArg{
103 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -0400104 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500105 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700106 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400107}
108
Kent Hagerman2b216042020-04-03 18:28:56 -0400109// ReEnableDevice invokes reenable device rpc
110func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
111 logger.Debugw("ReEnableDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400112 rpc := "reenable_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700113 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
114 if err != nil {
115 return nil, err
116 }
khenaidoo442e7c72020-03-10 16:13:48 -0400117 args := []*kafka.KVArg{
118 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400119 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500120 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700121 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400122}
123
Kent Hagerman2b216042020-04-03 18:28:56 -0400124// RebootDevice invokes reboot device rpc
125func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
126 logger.Debugw("RebootDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400127 rpc := "reboot_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700128 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
129 if err != nil {
130 return nil, err
131 }
khenaidoo442e7c72020-03-10 16:13:48 -0400132 args := []*kafka.KVArg{
133 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400134 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500135 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700136 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400137}
138
Kent Hagerman2b216042020-04-03 18:28:56 -0400139// DeleteDevice invokes delete device rpc
140func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
141 logger.Debugw("DeleteDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400142 rpc := "delete_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700143 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
144 if err != nil {
145 return nil, err
146 }
khenaidoo442e7c72020-03-10 16:13:48 -0400147 args := []*kafka.KVArg{
148 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400149 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500150 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700151 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400152}
153
Kent Hagerman2b216042020-04-03 18:28:56 -0400154// GetOfpDeviceInfo invokes get ofp device info rpc
155func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
156 logger.Debugw("GetOfpDeviceInfo", log.Fields{"device-id": device.Id})
khenaidoo442e7c72020-03-10 16:13:48 -0400157 rpc := "get_ofp_device_info"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700158 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
159 if err != nil {
160 return nil, err
161 }
khenaidoo442e7c72020-03-10 16:13:48 -0400162 args := []*kafka.KVArg{
163 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400164 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500165 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700166 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400167}
168
Kent Hagerman2b216042020-04-03 18:28:56 -0400169// GetOfpPortInfo invokes get ofp port info rpc
170func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
171 logger.Debugw("GetOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700172 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
173 if err != nil {
174 return nil, err
175 }
khenaidoo442e7c72020-03-10 16:13:48 -0400176 args := []*kafka.KVArg{
177 {Key: "device", Value: device},
178 {Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
khenaidoo4d4802d2018-10-04 21:59:49 -0400179 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500180 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700181 return ap.sendRPC(ctx, "get_ofp_port_info", toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400182}
183
Kent Hagerman2b216042020-04-03 18:28:56 -0400184// ReconcileDevice invokes reconcile device rpc
185func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
186 logger.Debugw("ReconcileDevice", log.Fields{"device-id": device.Id})
Matt Jeanneret7cf8e0b2020-01-09 11:57:51 -0500187 rpc := "reconcile_device"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700188 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
189 if err != nil {
190 return nil, err
191 }
khenaidooba6b6c42019-08-02 09:11:56 -0400192 args := []*kafka.KVArg{
193 {Key: "device", Value: device},
194 }
khenaidooba6b6c42019-08-02 09:11:56 -0400195 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700196 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400197}
198
Kent Hagerman2b216042020-04-03 18:28:56 -0400199// DownloadImage invokes download image rpc
200func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
201 logger.Debugw("DownloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500202 rpc := "download_image"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700203 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
204 if err != nil {
205 return nil, err
206 }
khenaidoo442e7c72020-03-10 16:13:48 -0400207 args := []*kafka.KVArg{
208 {Key: "device", Value: device},
209 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500210 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500211 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700212 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400213}
214
Kent Hagerman2b216042020-04-03 18:28:56 -0400215// GetImageDownloadStatus invokes get image download status rpc
216func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
217 logger.Debugw("GetImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500218 rpc := "get_image_download_status"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700219 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
220 if err != nil {
221 return nil, err
222 }
khenaidoo442e7c72020-03-10 16:13:48 -0400223 args := []*kafka.KVArg{
224 {Key: "device", Value: device},
225 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500226 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500227 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700228 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400229}
230
Kent Hagerman2b216042020-04-03 18:28:56 -0400231// CancelImageDownload invokes cancel image download rpc
232func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
233 logger.Debugw("CancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500234 rpc := "cancel_image_download"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700235 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
236 if err != nil {
237 return nil, err
238 }
khenaidoo442e7c72020-03-10 16:13:48 -0400239 args := []*kafka.KVArg{
240 {Key: "device", Value: device},
241 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500242 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500243 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700244 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400245}
246
Kent Hagerman2b216042020-04-03 18:28:56 -0400247// ActivateImageUpdate invokes activate image update rpc
248func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
249 logger.Debugw("ActivateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500250 rpc := "activate_image_update"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700251 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
252 if err != nil {
253 return nil, err
254 }
khenaidoo442e7c72020-03-10 16:13:48 -0400255 args := []*kafka.KVArg{
256 {Key: "device", Value: device},
257 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500258 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500259 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700260 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400261}
262
Kent Hagerman2b216042020-04-03 18:28:56 -0400263// RevertImageUpdate invokes revert image update rpc
264func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
265 logger.Debugw("RevertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500266 rpc := "revert_image_update"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700267 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
268 if err != nil {
269 return nil, err
270 }
khenaidoo442e7c72020-03-10 16:13:48 -0400271 args := []*kafka.KVArg{
272 {Key: "device", Value: device},
273 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500274 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500275 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700276 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400277}
278
Kent Hagerman2b216042020-04-03 18:28:56 -0400279func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
280 logger.Debugw("PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700281 toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
282 if err != nil {
283 return nil, err
284 }
khenaidoofdbad6e2018-11-06 22:26:38 -0500285 rpc := "receive_packet_out"
khenaidoo442e7c72020-03-10 16:13:48 -0400286 args := []*kafka.KVArg{
287 {Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
288 {Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
289 {Key: "packet", Value: packet},
khenaidoofdbad6e2018-11-06 22:26:38 -0500290 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500291 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700292 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500293}
294
Kent Hagerman2b216042020-04-03 18:28:56 -0400295// UpdateFlowsBulk invokes update flows bulk rpc
296func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
297 logger.Debugw("UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700298 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
299 if err != nil {
300 return nil, err
301 }
khenaidoo19d7b632018-10-30 10:49:50 -0400302 rpc := "update_flows_bulk"
khenaidoo442e7c72020-03-10 16:13:48 -0400303 args := []*kafka.KVArg{
304 {Key: "device", Value: device},
305 {Key: "flows", Value: flows},
306 {Key: "groups", Value: groups},
307 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400308 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500309 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700310 return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400311}
312
Kent Hagerman2b216042020-04-03 18:28:56 -0400313// UpdateFlowsIncremental invokes update flows incremental rpc
314func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
315 logger.Debugw("UpdateFlowsIncremental",
khenaidoo0458db62019-06-20 08:50:36 -0400316 log.Fields{
khenaidoo442e7c72020-03-10 16:13:48 -0400317 "device-id": device.Id,
318 "flow-to-add-count": len(flowChanges.ToAdd.Items),
319 "flow-to-delete-count": len(flowChanges.ToRemove.Items),
320 "group-to-add-count": len(groupChanges.ToAdd.Items),
321 "group-to-delete-count": len(groupChanges.ToRemove.Items),
322 "group-to-update-count": len(groupChanges.ToUpdate.Items),
khenaidoo0458db62019-06-20 08:50:36 -0400323 })
Matteo Scandolod525ae32020-04-02 17:27:29 -0700324 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
325 if err != nil {
326 return nil, err
327 }
Matt Jeanneretb0037422019-03-23 14:36:51 -0400328 rpc := "update_flows_incrementally"
khenaidoo442e7c72020-03-10 16:13:48 -0400329 args := []*kafka.KVArg{
330 {Key: "device", Value: device},
331 {Key: "flow_changes", Value: flowChanges},
332 {Key: "group_changes", Value: groupChanges},
333 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400334 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500335 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700336 return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400337}
338
Kent Hagerman2b216042020-04-03 18:28:56 -0400339// UpdatePmConfigs invokes update pm configs rpc
340func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
341 logger.Debugw("UpdatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
Matteo Scandolod525ae32020-04-02 17:27:29 -0700342 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
343 if err != nil {
344 return nil, err
345 }
khenaidoob3127472019-07-24 21:04:55 -0400346 rpc := "Update_pm_config"
khenaidoo442e7c72020-03-10 16:13:48 -0400347 args := []*kafka.KVArg{
348 {Key: "device", Value: device},
349 {Key: "pm_configs", Value: pmConfigs},
khenaidoob3127472019-07-24 21:04:55 -0400350 }
khenaidoob3127472019-07-24 21:04:55 -0400351 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700352 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400353}
354
Kent Hagerman2b216042020-04-03 18:28:56 -0400355// SimulateAlarm invokes simulate alarm rpc
356func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
357 logger.Debugw("SimulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
serkant.uluderya334479d2019-04-10 08:26:15 -0700358 rpc := "simulate_alarm"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700359 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
360 if err != nil {
361 return nil, err
362 }
khenaidoo442e7c72020-03-10 16:13:48 -0400363 args := []*kafka.KVArg{
364 {Key: "device", Value: device},
365 {Key: "request", Value: simulateReq},
serkant.uluderya334479d2019-04-10 08:26:15 -0700366 }
serkant.uluderya334479d2019-04-10 08:26:15 -0700367 replyToTopic := ap.getCoreTopic()
368 ap.deviceTopicRegistered = true
Matteo Scandolod525ae32020-04-02 17:27:29 -0700369 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
serkant.uluderya334479d2019-04-10 08:26:15 -0700370}
kesavandbc2d1622020-01-21 00:42:01 -0500371
Kent Hagerman2b216042020-04-03 18:28:56 -0400372func (ap *AdapterProxy) DisablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
373 logger.Debugw("DisablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500374 rpc := "disable_port"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700375 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
376 if err != nil {
377 return nil, err
378 }
khenaidoo442e7c72020-03-10 16:13:48 -0400379 args := []*kafka.KVArg{
380 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
381 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500382 }
kesavandbc2d1622020-01-21 00:42:01 -0500383 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700384 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500385}
386
Kent Hagerman2b216042020-04-03 18:28:56 -0400387func (ap *AdapterProxy) EnablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
388 logger.Debugw("EnablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
kesavandbc2d1622020-01-21 00:42:01 -0500389 rpc := "enable_port"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700390 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
391 if err != nil {
392 return nil, err
393 }
khenaidoo442e7c72020-03-10 16:13:48 -0400394 args := []*kafka.KVArg{
395 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
396 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500397 }
kesavandbc2d1622020-01-21 00:42:01 -0500398 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700399 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500400}
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500401
Kent Hagerman2b216042020-04-03 18:28:56 -0400402// ChildDeviceLost invokes child device_lost rpc
403func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
404 logger.Debugw("ChildDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500405 rpc := "child_device_lost"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700406 toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
407 if err != nil {
408 return nil, err
409 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500410 args := []*kafka.KVArg{
Matteo Scandolod525ae32020-04-02 17:27:29 -0700411 {Key: "pDeviceId", Value: &ic.StrType{Val: deviceID}},
khenaidoo442e7c72020-03-10 16:13:48 -0400412 {Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
413 {Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
414 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500415 replyToTopic := ap.getCoreTopic()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700416 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500417}
onkarkundargi87285252020-01-27 11:34:52 +0530418
Kent Hagerman2b216042020-04-03 18:28:56 -0400419func (ap *AdapterProxy) StartOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
Girish Kumarf56a4682020-03-20 20:07:46 +0000420 logger.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
onkarkundargi87285252020-01-27 11:34:52 +0530421 rpc := "start_omci_test"
Matteo Scandolod525ae32020-04-02 17:27:29 -0700422 toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
423 if err != nil {
424 return nil, err
425 }
onkarkundargi87285252020-01-27 11:34:52 +0530426 // Use a device specific topic as we are the only core handling requests for this device
427 replyToTopic := ap.getCoreTopic()
Scott Baker432f9be2020-03-26 11:56:30 -0700428 // TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
429 // than including the whole request, which is (deviceid, uuid)
Matteo Scandolod525ae32020-04-02 17:27:29 -0700430 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
onkarkundargi87285252020-01-27 11:34:52 +0530431 &kafka.KVArg{Key: "device", Value: device},
432 &kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
433}
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -0800434
435func (ap *AdapterProxy) GetExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, id string, valuetype voltha.ValueType_Type) (chan *kafka.RpcResponse, error) {
436 log.Debugw("GetExtValue", log.Fields{"device-id": pdevice.Id, "onuid": id})
437 rpc := "get_ext_value"
438 toTopic, err := ap.getAdapterTopic(pdevice.Id, pdevice.Adapter)
439 if err != nil {
440 return nil, err
441 }
442 // Use a device specific topic to send the request. The adapter handling the device creates a device
443 // specific topic
444 args := []*kafka.KVArg{
445 {
446 Key: "pDeviceId",
447 Value: &ic.StrType{Val: pdevice.Id},
448 },
449 {
450 Key: "device",
451 Value: cdevice,
452 },
453 {
454 Key: "valuetype",
455 Value: &ic.IntType{Val: int64(valuetype)},
456 }}
457
458 replyToTopic := ap.getCoreTopic()
459 return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, pdevice.Id, args...)
460}