blob: e3c362edfc781c0874d60ea471dc6c134d2897a2 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
khenaidoob9203542018-09-17 22:56:37 -040017package core
18
19import (
20 "context"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080021 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
22 "github.com/opencord/voltha-lib-go/v3/pkg/log"
23 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
24 "github.com/opencord/voltha-protos/v3/go/openflow_13"
25 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040026)
27
npujar1d86a522019-11-14 17:11:16 +053028// AdapterProxy represents adapter proxy attributes
khenaidoob9203542018-09-17 22:56:37 -040029type AdapterProxy struct {
khenaidoo54e0ddf2019-02-27 16:21:33 -050030 deviceTopicRegistered bool
Kent Hagermana6d0c362019-07-30 12:50:21 -040031 corePairTopic string
npujar467fe752020-01-16 20:17:45 +053032 kafkaICProxy kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040033}
34
npujar1d86a522019-11-14 17:11:16 +053035// NewAdapterProxy will return adapter proxy instance
npujar467fe752020-01-16 20:17:45 +053036func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
Kent Hagermana6d0c362019-07-30 12:50:21 -040037 return &AdapterProxy{
38 kafkaICProxy: kafkaProxy,
39 corePairTopic: corePairTopic,
40 deviceTopicRegistered: false,
41 }
khenaidoob9203542018-09-17 22:56:37 -040042}
43
serkant.uluderya334479d2019-04-10 08:26:15 -070044func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
Kent Hagermana6d0c362019-07-30 12:50:21 -040045 return kafka.Topic{Name: ap.corePairTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050046}
47
serkant.uluderya334479d2019-04-10 08:26:15 -070048func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
khenaidoo54e0ddf2019-02-27 16:21:33 -050049 return kafka.Topic{Name: adapterName}
50}
51
khenaidoo442e7c72020-03-10 16:13:48 -040052func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
53 waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
54
55 // Sent the request to kafka
56 respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
57
58 // Wait for first response which would indicate whether the request was successfully sent to kafka.
59 firstResponse, ok := <-respChnl
60 if !ok || firstResponse.MType != kafka.RpcSent {
61 log.Errorw("failure to request to kafka", log.Fields{"rpc": rpc, "device-id": deviceID, "error": firstResponse.Err})
62 return nil, firstResponse.Err
63 }
64 // return the kafka channel for the caller to wait for the response of the RPC call
65 return respChnl, nil
66}
67
68// adoptDevice invokes adopt device rpc
69func (ap *AdapterProxy) adoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
70 log.Debugw("adoptDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040071 rpc := "adopt_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -050072 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -040073 args := []*kafka.KVArg{
74 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -040075 }
khenaidoo54e0ddf2019-02-27 16:21:33 -050076 replyToTopic := ap.getCoreTopic()
khenaidoo54e0ddf2019-02-27 16:21:33 -050077 ap.deviceTopicRegistered = true
khenaidoo442e7c72020-03-10 16:13:48 -040078 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -040079}
80
khenaidoo442e7c72020-03-10 16:13:48 -040081// disableDevice invokes disable device rpc
82func (ap *AdapterProxy) disableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
83 log.Debugw("disableDevice", log.Fields{"device-id": device.Id})
khenaidoo92e62c52018-10-03 14:02:54 -040084 rpc := "disable_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -050085 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -040086 args := []*kafka.KVArg{
87 {Key: "device", Value: device},
khenaidoob9203542018-09-17 22:56:37 -040088 }
khenaidoo54e0ddf2019-02-27 16:21:33 -050089 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -040090 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -040091}
92
khenaidoo442e7c72020-03-10 16:13:48 -040093// reEnableDevice invokes reenable device rpc
94func (ap *AdapterProxy) reEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
95 log.Debugw("reEnableDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -040096 rpc := "reenable_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -050097 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -040098 args := []*kafka.KVArg{
99 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400100 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500101 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400102 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400103}
104
khenaidoo442e7c72020-03-10 16:13:48 -0400105// rebootDevice invokes reboot device rpc
106func (ap *AdapterProxy) rebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
107 log.Debugw("rebootDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400108 rpc := "reboot_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500109 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400110 args := []*kafka.KVArg{
111 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400112 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500113 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400114 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400115}
116
khenaidoo442e7c72020-03-10 16:13:48 -0400117// deleteDevice invokes delete device rpc
118func (ap *AdapterProxy) deleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
119 log.Debugw("deleteDevice", log.Fields{"device-id": device.Id})
khenaidoo4d4802d2018-10-04 21:59:49 -0400120 rpc := "delete_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500121 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400122 args := []*kafka.KVArg{
123 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400124 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500125 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400126 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400127}
128
khenaidoo442e7c72020-03-10 16:13:48 -0400129// getOfpDeviceInfo invokes get ofp device info rpc
130func (ap *AdapterProxy) getOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
131 log.Debugw("getOfpDeviceInfo", log.Fields{"device-id": device.Id})
132 rpc := "get_ofp_device_info"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500133 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400134 args := []*kafka.KVArg{
135 {Key: "device", Value: device},
khenaidoo4d4802d2018-10-04 21:59:49 -0400136 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500137 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400138 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400139}
140
khenaidoo442e7c72020-03-10 16:13:48 -0400141// getOfpPortInfo invokes get ofp port info rpc
142func (ap *AdapterProxy) getOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
143 log.Debugw("getOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500144 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400145 args := []*kafka.KVArg{
146 {Key: "device", Value: device},
147 {Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
khenaidoo4d4802d2018-10-04 21:59:49 -0400148 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500149 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400150 return ap.sendRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400151}
152
khenaidoo442e7c72020-03-10 16:13:48 -0400153// reconcileDevice invokes reconcile device rpc
154func (ap *AdapterProxy) reconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
155 log.Debugw("reconcileDevice", log.Fields{"device-id": device.Id})
Matt Jeanneret7cf8e0b2020-01-09 11:57:51 -0500156 rpc := "reconcile_device"
khenaidooba6b6c42019-08-02 09:11:56 -0400157 toTopic := ap.getAdapterTopic(device.Adapter)
158 args := []*kafka.KVArg{
159 {Key: "device", Value: device},
160 }
khenaidooba6b6c42019-08-02 09:11:56 -0400161 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400162 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400163}
164
khenaidoo442e7c72020-03-10 16:13:48 -0400165// downloadImage invokes download image rpc
166func (ap *AdapterProxy) downloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
167 log.Debugw("downloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500168 rpc := "download_image"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500169 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400170 args := []*kafka.KVArg{
171 {Key: "device", Value: device},
172 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500173 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500174 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400175 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400176}
177
khenaidoo442e7c72020-03-10 16:13:48 -0400178// getImageDownloadStatus invokes get image download status rpc
179func (ap *AdapterProxy) getImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
180 log.Debugw("getImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500181 rpc := "get_image_download_status"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500182 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400183 args := []*kafka.KVArg{
184 {Key: "device", Value: device},
185 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500186 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500187 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400188 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400189}
190
khenaidoo442e7c72020-03-10 16:13:48 -0400191// cancelImageDownload invokes cancel image download rpc
192func (ap *AdapterProxy) cancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
193 log.Debugw("cancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500194 rpc := "cancel_image_download"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500195 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400196 args := []*kafka.KVArg{
197 {Key: "device", Value: device},
198 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500199 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500200 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400201 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400202}
203
khenaidoo442e7c72020-03-10 16:13:48 -0400204// activateImageUpdate invokes activate image update rpc
205func (ap *AdapterProxy) activateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
206 log.Debugw("activateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500207 rpc := "activate_image_update"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500208 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400209 args := []*kafka.KVArg{
210 {Key: "device", Value: device},
211 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500212 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500213 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400214 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400215}
216
khenaidoo442e7c72020-03-10 16:13:48 -0400217// revertImageUpdate invokes revert image update rpc
218func (ap *AdapterProxy) revertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
219 log.Debugw("revertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
khenaidoof5a5bfa2019-01-23 22:20:29 -0500220 rpc := "revert_image_update"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500221 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400222 args := []*kafka.KVArg{
223 {Key: "device", Value: device},
224 {Key: "request", Value: download},
khenaidoof5a5bfa2019-01-23 22:20:29 -0500225 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500226 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400227 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400228}
229
khenaidoo442e7c72020-03-10 16:13:48 -0400230func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
231 log.Debugw("packetOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500232 toTopic := ap.getAdapterTopic(deviceType)
khenaidoofdbad6e2018-11-06 22:26:38 -0500233 rpc := "receive_packet_out"
khenaidoo442e7c72020-03-10 16:13:48 -0400234 args := []*kafka.KVArg{
235 {Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
236 {Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
237 {Key: "packet", Value: packet},
khenaidoofdbad6e2018-11-06 22:26:38 -0500238 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500239 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400240 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500241}
242
khenaidoo442e7c72020-03-10 16:13:48 -0400243// updateFlowsBulk invokes update flows bulk rpc
244func (ap *AdapterProxy) updateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
245 log.Debugw("updateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500246 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo19d7b632018-10-30 10:49:50 -0400247 rpc := "update_flows_bulk"
khenaidoo442e7c72020-03-10 16:13:48 -0400248 args := []*kafka.KVArg{
249 {Key: "device", Value: device},
250 {Key: "flows", Value: flows},
251 {Key: "groups", Value: groups},
252 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400253 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500254 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400255 return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400256}
257
khenaidoo442e7c72020-03-10 16:13:48 -0400258// updateFlowsIncremental invokes update flows incremental rpc
259func (ap *AdapterProxy) updateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
260 log.Debugw("updateFlowsIncremental",
khenaidoo0458db62019-06-20 08:50:36 -0400261 log.Fields{
khenaidoo442e7c72020-03-10 16:13:48 -0400262 "device-id": device.Id,
263 "flow-to-add-count": len(flowChanges.ToAdd.Items),
264 "flow-to-delete-count": len(flowChanges.ToRemove.Items),
265 "group-to-add-count": len(groupChanges.ToAdd.Items),
266 "group-to-delete-count": len(groupChanges.ToRemove.Items),
267 "group-to-update-count": len(groupChanges.ToUpdate.Items),
khenaidoo0458db62019-06-20 08:50:36 -0400268 })
khenaidoo54e0ddf2019-02-27 16:21:33 -0500269 toTopic := ap.getAdapterTopic(device.Adapter)
Matt Jeanneretb0037422019-03-23 14:36:51 -0400270 rpc := "update_flows_incrementally"
khenaidoo442e7c72020-03-10 16:13:48 -0400271 args := []*kafka.KVArg{
272 {Key: "device", Value: device},
273 {Key: "flow_changes", Value: flowChanges},
274 {Key: "group_changes", Value: groupChanges},
275 {Key: "flow_metadata", Value: flowMetadata},
khenaidoo19d7b632018-10-30 10:49:50 -0400276 }
khenaidoo54e0ddf2019-02-27 16:21:33 -0500277 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400278 return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400279}
280
khenaidoo442e7c72020-03-10 16:13:48 -0400281// updatePmConfigs invokes update pm configs rpc
282func (ap *AdapterProxy) updatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
283 log.Debugw("updatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
khenaidoob3127472019-07-24 21:04:55 -0400284 toTopic := ap.getAdapterTopic(device.Adapter)
285 rpc := "Update_pm_config"
khenaidoo442e7c72020-03-10 16:13:48 -0400286 args := []*kafka.KVArg{
287 {Key: "device", Value: device},
288 {Key: "pm_configs", Value: pmConfigs},
khenaidoob3127472019-07-24 21:04:55 -0400289 }
khenaidoob3127472019-07-24 21:04:55 -0400290 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400291 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob9203542018-09-17 22:56:37 -0400292}
293
khenaidoo442e7c72020-03-10 16:13:48 -0400294// simulateAlarm invokes simulate alarm rpc
295func (ap *AdapterProxy) simulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
296 log.Debugw("simulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
serkant.uluderya334479d2019-04-10 08:26:15 -0700297 rpc := "simulate_alarm"
298 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400299 args := []*kafka.KVArg{
300 {Key: "device", Value: device},
301 {Key: "request", Value: simulateReq},
serkant.uluderya334479d2019-04-10 08:26:15 -0700302 }
serkant.uluderya334479d2019-04-10 08:26:15 -0700303 replyToTopic := ap.getCoreTopic()
304 ap.deviceTopicRegistered = true
khenaidoo442e7c72020-03-10 16:13:48 -0400305 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
serkant.uluderya334479d2019-04-10 08:26:15 -0700306}
kesavandbc2d1622020-01-21 00:42:01 -0500307
khenaidoo442e7c72020-03-10 16:13:48 -0400308func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
kesavandbc2d1622020-01-21 00:42:01 -0500309 log.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
310 rpc := "disable_port"
kesavandbc2d1622020-01-21 00:42:01 -0500311 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400312 args := []*kafka.KVArg{
313 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
314 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500315 }
kesavandbc2d1622020-01-21 00:42:01 -0500316 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400317 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500318}
319
khenaidoo442e7c72020-03-10 16:13:48 -0400320func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
kesavandbc2d1622020-01-21 00:42:01 -0500321 log.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
322 rpc := "enable_port"
kesavandbc2d1622020-01-21 00:42:01 -0500323 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo442e7c72020-03-10 16:13:48 -0400324 args := []*kafka.KVArg{
325 {Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
326 {Key: "port", Value: port},
kesavandbc2d1622020-01-21 00:42:01 -0500327 }
kesavandbc2d1622020-01-21 00:42:01 -0500328 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400329 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
kesavandbc2d1622020-01-21 00:42:01 -0500330}
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500331
khenaidoo442e7c72020-03-10 16:13:48 -0400332// childDeviceLost invokes child device_lost rpc
333func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, pDeviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
334 log.Debugw("childDeviceLost", log.Fields{"parent-device-id": pDeviceID, "parent-port-no": pPortNo, "onu-id": onuID})
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500335 rpc := "child_device_lost"
336 toTopic := ap.getAdapterTopic(deviceType)
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500337 args := []*kafka.KVArg{
khenaidoo442e7c72020-03-10 16:13:48 -0400338 {Key: "pDeviceId", Value: &ic.StrType{Val: pDeviceID}},
339 {Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
340 {Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
341 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500342 replyToTopic := ap.getCoreTopic()
khenaidoo442e7c72020-03-10 16:13:48 -0400343 return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, pDeviceID, args...)
Chaitrashree G S543df3e2020-02-24 22:36:54 -0500344}