blob: b7798ad6cf63cd39173385ca07b46026372b01e5 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
khenaidoob9203542018-09-17 22:56:37 -040017package core
18
19import (
20 "context"
npujar1d86a522019-11-14 17:11:16 +053021
khenaidoob9203542018-09-17 22:56:37 -040022 "github.com/golang/protobuf/ptypes"
khenaidoo92e62c52018-10-03 14:02:54 -040023 a "github.com/golang/protobuf/ptypes/any"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080024 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
25 "github.com/opencord/voltha-lib-go/v3/pkg/log"
26 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
27 "github.com/opencord/voltha-protos/v3/go/openflow_13"
28 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040029 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31)
32
npujar1d86a522019-11-14 17:11:16 +053033// AdapterProxy represents adapter proxy attributes
khenaidoob9203542018-09-17 22:56:37 -040034type AdapterProxy struct {
serkant.uluderya334479d2019-04-10 08:26:15 -070035 TestMode bool
khenaidoo54e0ddf2019-02-27 16:21:33 -050036 deviceTopicRegistered bool
Kent Hagermana6d0c362019-07-30 12:50:21 -040037 corePairTopic string
npujar467fe752020-01-16 20:17:45 +053038 kafkaICProxy kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040039}
40
npujar1d86a522019-11-14 17:11:16 +053041// NewAdapterProxy will return adapter proxy instance
npujar467fe752020-01-16 20:17:45 +053042func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
Kent Hagermana6d0c362019-07-30 12:50:21 -040043 return &AdapterProxy{
44 kafkaICProxy: kafkaProxy,
45 corePairTopic: corePairTopic,
46 deviceTopicRegistered: false,
47 }
khenaidoob9203542018-09-17 22:56:37 -040048}
49
npujar1d86a522019-11-14 17:11:16 +053050func unPackResponse(rpc string, deviceID string, success bool, response *a.Any) error {
khenaidoo92e62c52018-10-03 14:02:54 -040051 if success {
52 return nil
khenaidoo92e62c52018-10-03 14:02:54 -040053 }
npujar1d86a522019-11-14 17:11:16 +053054 unpackResult := &ic.Error{}
55 var err error
56 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
57 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
58 return err
59 }
60 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceID, "success": success, "error": err})
61 // TODO: Need to get the real error code
62 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
khenaidoo92e62c52018-10-03 14:02:54 -040063}
64
serkant.uluderya334479d2019-04-10 08:26:15 -070065func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
Kent Hagermana6d0c362019-07-30 12:50:21 -040066 return kafka.Topic{Name: ap.corePairTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050067}
68
serkant.uluderya334479d2019-04-10 08:26:15 -070069func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
khenaidoo54e0ddf2019-02-27 16:21:33 -050070 return kafka.Topic{Name: adapterName}
71}
72
npujar1d86a522019-11-14 17:11:16 +053073// AdoptDevice invokes adopt device rpc
khenaidoob9203542018-09-17 22:56:37 -040074func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
75 log.Debugw("AdoptDevice", log.Fields{"device": device})
khenaidoo92e62c52018-10-03 14:02:54 -040076 rpc := "adopt_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -050077 toTopic := ap.getAdapterTopic(device.Adapter)
78 //topic := kafka.Topic{Name: device.Adapter}
khenaidoob9203542018-09-17 22:56:37 -040079 args := make([]*kafka.KVArg, 1)
80 args[0] = &kafka.KVArg{
81 Key: "device",
82 Value: device,
83 }
khenaidoo43c82122018-11-22 18:38:28 -050084 // Use a device topic for the response as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -050085 replyToTopic := ap.getCoreTopic()
khenaidoo54e0ddf2019-02-27 16:21:33 -050086 ap.deviceTopicRegistered = true
khenaidoobdcb8e02019-03-06 16:28:56 -050087 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo43c82122018-11-22 18:38:28 -050088 log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
khenaidoo92e62c52018-10-03 14:02:54 -040089 return unPackResponse(rpc, device.Id, success, result)
90}
91
npujar1d86a522019-11-14 17:11:16 +053092// DisableDevice invokes disable device rpc
khenaidoo92e62c52018-10-03 14:02:54 -040093func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
94 log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
95 rpc := "disable_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -050096 toTopic := ap.getAdapterTopic(device.Adapter)
97
khenaidoo43c82122018-11-22 18:38:28 -050098 // Use a device specific topic to send the request. The adapter handling the device creates a device
99 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500100 //toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo92e62c52018-10-03 14:02:54 -0400101 args := make([]*kafka.KVArg, 1)
102 args[0] = &kafka.KVArg{
103 Key: "device",
104 Value: device,
khenaidoob9203542018-09-17 22:56:37 -0400105 }
khenaidoo43c82122018-11-22 18:38:28 -0500106 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500107 //replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
108 replyToTopic := ap.getCoreTopic()
khenaidooab1f7bd2019-11-14 14:00:27 -0500109 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -0400110 log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
111 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400112}
113
npujar1d86a522019-11-14 17:11:16 +0530114// ReEnableDevice invokes reenable device rpc
khenaidoo4d4802d2018-10-04 21:59:49 -0400115func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
116 log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
117 rpc := "reenable_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500118 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400119 args := make([]*kafka.KVArg, 1)
120 args[0] = &kafka.KVArg{
121 Key: "device",
122 Value: device,
123 }
khenaidoo43c82122018-11-22 18:38:28 -0500124 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500125 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500126 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400127 log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
128 return unPackResponse(rpc, device.Id, success, result)
129}
130
npujar1d86a522019-11-14 17:11:16 +0530131// RebootDevice invokes reboot device rpc
khenaidoo4d4802d2018-10-04 21:59:49 -0400132func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
133 log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
134 rpc := "reboot_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500135 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400136 args := make([]*kafka.KVArg, 1)
137 args[0] = &kafka.KVArg{
138 Key: "device",
139 Value: device,
140 }
khenaidoo43c82122018-11-22 18:38:28 -0500141 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500142 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500143 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400144 log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
145 return unPackResponse(rpc, device.Id, success, result)
146}
147
npujar1d86a522019-11-14 17:11:16 +0530148// DeleteDevice invokes delete device rpc
khenaidoo4d4802d2018-10-04 21:59:49 -0400149func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
150 log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
151 rpc := "delete_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500152 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400153 args := make([]*kafka.KVArg, 1)
154 args[0] = &kafka.KVArg{
155 Key: "device",
156 Value: device,
157 }
khenaidoo43c82122018-11-22 18:38:28 -0500158 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500159 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500160 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400161 log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
khenaidoo43c82122018-11-22 18:38:28 -0500162
khenaidoo4d4802d2018-10-04 21:59:49 -0400163 return unPackResponse(rpc, device.Id, success, result)
164}
165
npujar1d86a522019-11-14 17:11:16 +0530166// GetOfpDeviceInfo invokes get ofp device info rpc
khenaidoo79232702018-12-04 11:00:41 -0500167func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400168 log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500169 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400170 args := make([]*kafka.KVArg, 1)
171 args[0] = &kafka.KVArg{
172 Key: "device",
173 Value: device,
174 }
khenaidoo43c82122018-11-22 18:38:28 -0500175 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500176 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500177 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400178 log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
179 if success {
khenaidoo79232702018-12-04 11:00:41 -0500180 unpackResult := &ic.SwitchCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400181 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
182 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
183 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
184 }
185 return unpackResult, nil
khenaidoo4d4802d2018-10-04 21:59:49 -0400186 }
npujar1d86a522019-11-14 17:11:16 +0530187 unpackResult := &ic.Error{}
188 var err error
189 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
190 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
191 }
192 log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
193 // TODO: Need to get the real error code
194 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
khenaidoo4d4802d2018-10-04 21:59:49 -0400195}
196
npujar1d86a522019-11-14 17:11:16 +0530197// GetOfpPortInfo invokes get ofp port info rpc
khenaidoo79232702018-12-04 11:00:41 -0500198func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400199 log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500200 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400201 args := make([]*kafka.KVArg, 2)
202 args[0] = &kafka.KVArg{
203 Key: "device",
204 Value: device,
205 }
khenaidoo79232702018-12-04 11:00:41 -0500206 pNo := &ic.IntType{Val: int64(portNo)}
khenaidoo4d4802d2018-10-04 21:59:49 -0400207 args[1] = &kafka.KVArg{
208 Key: "port_no",
209 Value: pNo,
210 }
khenaidoo43c82122018-11-22 18:38:28 -0500211 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500212 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500213 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400214 log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
215 if success {
khenaidoo79232702018-12-04 11:00:41 -0500216 unpackResult := &ic.PortCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400217 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
218 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
219 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
220 }
221 return unpackResult, nil
khenaidoo4d4802d2018-10-04 21:59:49 -0400222 }
npujar1d86a522019-11-14 17:11:16 +0530223 unpackResult := &ic.Error{}
224 var err error
225 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
226 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
227 }
228 log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
229 // TODO: Need to get the real error code
230 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
khenaidoo4d4802d2018-10-04 21:59:49 -0400231}
232
233//TODO: Implement the functions below
234
npujar1d86a522019-11-14 17:11:16 +0530235// AdapterDescriptor - TODO
khenaidoob9203542018-09-17 22:56:37 -0400236func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
237 log.Debug("AdapterDescriptor")
238 return nil, nil
239}
240
npujar1d86a522019-11-14 17:11:16 +0530241// DeviceTypes - TODO
khenaidoob9203542018-09-17 22:56:37 -0400242func (ap *AdapterProxy) DeviceTypes() (*voltha.DeviceType, error) {
243 log.Debug("DeviceTypes")
244 return nil, nil
245}
246
npujar1d86a522019-11-14 17:11:16 +0530247// Health - TODO
khenaidoob9203542018-09-17 22:56:37 -0400248func (ap *AdapterProxy) Health() (*voltha.HealthStatus, error) {
249 log.Debug("Health")
250 return nil, nil
251}
252
npujar1d86a522019-11-14 17:11:16 +0530253// ReconcileDevice invokes reconcile device rpc
khenaidooba6b6c42019-08-02 09:11:56 -0400254func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) error {
255 log.Debugw("ReconcileDevice", log.Fields{"deviceId": device.Id})
Matt Jeanneret7cf8e0b2020-01-09 11:57:51 -0500256 rpc := "reconcile_device"
khenaidooba6b6c42019-08-02 09:11:56 -0400257 toTopic := ap.getAdapterTopic(device.Adapter)
258 args := []*kafka.KVArg{
259 {Key: "device", Value: device},
260 }
261 // Use a device specific topic as we are the only core handling requests for this device
262 replyToTopic := ap.getCoreTopic()
263 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
264 log.Debugw("ReconcileDevice-response", log.Fields{"deviceid": device.Id, "success": success})
265
266 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400267}
268
npujar1d86a522019-11-14 17:11:16 +0530269// AbandonDevice - TODO
khenaidoob9203542018-09-17 22:56:37 -0400270func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
271 log.Debug("AbandonDevice")
272 return nil
273}
274
npujar1d86a522019-11-14 17:11:16 +0530275// GetDeviceDetails - TODO
khenaidoob9203542018-09-17 22:56:37 -0400276func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
277 log.Debug("GetDeviceDetails")
278 return nil, nil
279}
280
npujar1d86a522019-11-14 17:11:16 +0530281// DownloadImage invokes download image rpc
khenaidoof5a5bfa2019-01-23 22:20:29 -0500282func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
283 log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
284 rpc := "download_image"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500285 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500286 args := make([]*kafka.KVArg, 2)
287 args[0] = &kafka.KVArg{
288 Key: "device",
289 Value: device,
290 }
291 args[1] = &kafka.KVArg{
292 Key: "request",
293 Value: download,
294 }
295 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500296 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500297 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500298 log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
299
300 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400301}
302
npujar1d86a522019-11-14 17:11:16 +0530303// GetImageDownloadStatus invokes get image download status rpc
khenaidoof5a5bfa2019-01-23 22:20:29 -0500304func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
305 log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
306 rpc := "get_image_download_status"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500307 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500308 args := make([]*kafka.KVArg, 2)
309 args[0] = &kafka.KVArg{
310 Key: "device",
311 Value: device,
312 }
313 args[1] = &kafka.KVArg{
314 Key: "request",
315 Value: download,
316 }
317 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500318 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500319 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500320 log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
321
322 if success {
323 unpackResult := &voltha.ImageDownload{}
324 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
325 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
326 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
327 }
328 return unpackResult, nil
khenaidoof5a5bfa2019-01-23 22:20:29 -0500329 }
npujar1d86a522019-11-14 17:11:16 +0530330 unpackResult := &ic.Error{}
331 var err error
332 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
333 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
334 return nil, err
335 }
336 log.Debugw("GetImageDownloadStatus-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
337 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
khenaidoob9203542018-09-17 22:56:37 -0400338}
339
npujar1d86a522019-11-14 17:11:16 +0530340// CancelImageDownload invokes cancel image download rpc
khenaidoof5a5bfa2019-01-23 22:20:29 -0500341func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
342 log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
343 rpc := "cancel_image_download"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500344 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500345 args := make([]*kafka.KVArg, 2)
346 args[0] = &kafka.KVArg{
347 Key: "device",
348 Value: device,
349 }
350 args[1] = &kafka.KVArg{
351 Key: "request",
352 Value: download,
353 }
354 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500355 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500356 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500357 log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
358
359 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400360}
361
npujar1d86a522019-11-14 17:11:16 +0530362// ActivateImageUpdate invokes activate image update rpc
khenaidoof5a5bfa2019-01-23 22:20:29 -0500363func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
364 log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
365 rpc := "activate_image_update"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500366 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500367 args := make([]*kafka.KVArg, 2)
368 args[0] = &kafka.KVArg{
369 Key: "device",
370 Value: device,
371 }
372 args[1] = &kafka.KVArg{
373 Key: "request",
374 Value: download,
375 }
376 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500377 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500378 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500379 log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
380
381 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400382}
383
npujar1d86a522019-11-14 17:11:16 +0530384// RevertImageUpdate invokes revert image update rpc
khenaidoof5a5bfa2019-01-23 22:20:29 -0500385func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
386 log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
387 rpc := "revert_image_update"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500388 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500389 args := make([]*kafka.KVArg, 2)
390 args[0] = &kafka.KVArg{
391 Key: "device",
392 Value: device,
393 }
394 args[1] = &kafka.KVArg{
395 Key: "request",
396 Value: download,
397 }
398 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500399 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500400 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500401 log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
402
403 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400404}
405
npujar1d86a522019-11-14 17:11:16 +0530406// SelfTestDevice - TODO
khenaidoob9203542018-09-17 22:56:37 -0400407func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
408 log.Debug("SelfTestDevice")
409 return nil, nil
410}
411
npujar467fe752020-01-16 20:17:45 +0530412func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
npujar1d86a522019-11-14 17:11:16 +0530413 log.Debugw("packetOut", log.Fields{"deviceId": deviceID})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500414 toTopic := ap.getAdapterTopic(deviceType)
khenaidoofdbad6e2018-11-06 22:26:38 -0500415 rpc := "receive_packet_out"
npujar1d86a522019-11-14 17:11:16 +0530416 dID := &ic.StrType{Val: deviceID}
khenaidoofdbad6e2018-11-06 22:26:38 -0500417 args := make([]*kafka.KVArg, 3)
418 args[0] = &kafka.KVArg{
419 Key: "deviceId",
npujar1d86a522019-11-14 17:11:16 +0530420 Value: dID,
khenaidoofdbad6e2018-11-06 22:26:38 -0500421 }
khenaidoo79232702018-12-04 11:00:41 -0500422 op := &ic.IntType{Val: int64(outPort)}
khenaidoofdbad6e2018-11-06 22:26:38 -0500423 args[1] = &kafka.KVArg{
424 Key: "outPort",
425 Value: op,
426 }
427 args[2] = &kafka.KVArg{
428 Key: "packet",
429 Value: packet,
430 }
431
432 // TODO: Do we need to wait for an ACK on a packet Out?
khenaidoo43c82122018-11-22 18:38:28 -0500433 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500434 replyToTopic := ap.getCoreTopic()
npujar467fe752020-01-16 20:17:45 +0530435 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
npujar1d86a522019-11-14 17:11:16 +0530436 log.Debugw("packetOut", log.Fields{"deviceid": deviceID, "success": success})
437 return unPackResponse(rpc, deviceID, success, result)
khenaidoofdbad6e2018-11-06 22:26:38 -0500438}
439
npujar1d86a522019-11-14 17:11:16 +0530440// UpdateFlowsBulk invokes update flows bulk rpc
npujar467fe752020-01-16 20:17:45 +0530441func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error {
khenaidoo0458db62019-06-20 08:50:36 -0400442 log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flowsInUpdate": len(flows.Items), "groupsToUpdate": len(groups.Items)})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500443 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo19d7b632018-10-30 10:49:50 -0400444 rpc := "update_flows_bulk"
Manikkaraj kb1a10922019-07-29 12:10:34 -0400445 args := make([]*kafka.KVArg, 4)
khenaidoo19d7b632018-10-30 10:49:50 -0400446 args[0] = &kafka.KVArg{
447 Key: "device",
448 Value: device,
449 }
450 args[1] = &kafka.KVArg{
451 Key: "flows",
452 Value: flows,
453 }
454 args[2] = &kafka.KVArg{
455 Key: "groups",
456 Value: groups,
457 }
Manikkaraj kb1a10922019-07-29 12:10:34 -0400458 args[3] = &kafka.KVArg{
459 Key: "flow_metadata",
460 Value: flowMetadata,
461 }
khenaidoo19d7b632018-10-30 10:49:50 -0400462
khenaidoo43c82122018-11-22 18:38:28 -0500463 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500464 replyToTopic := ap.getCoreTopic()
npujar467fe752020-01-16 20:17:45 +0530465 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400466 log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
467 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400468}
469
npujar1d86a522019-11-14 17:11:16 +0530470// UpdateFlowsIncremental invokes update flows incremental rpc
npujar467fe752020-01-16 20:17:45 +0530471func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
khenaidoo0458db62019-06-20 08:50:36 -0400472 log.Debugw("UpdateFlowsIncremental",
473 log.Fields{
474 "deviceId": device.Id,
475 "flowsToAdd": len(flowChanges.ToAdd.Items),
476 "flowsToDelete": len(flowChanges.ToRemove.Items),
477 "groupsToAdd": len(groupChanges.ToAdd.Items),
478 "groupsToDelete": len(groupChanges.ToRemove.Items),
479 "groupsToUpdate": len(groupChanges.ToUpdate.Items),
480 })
khenaidoo54e0ddf2019-02-27 16:21:33 -0500481 toTopic := ap.getAdapterTopic(device.Adapter)
Matt Jeanneretb0037422019-03-23 14:36:51 -0400482 rpc := "update_flows_incrementally"
Manikkaraj kb1a10922019-07-29 12:10:34 -0400483 args := make([]*kafka.KVArg, 4)
khenaidoo19d7b632018-10-30 10:49:50 -0400484 args[0] = &kafka.KVArg{
485 Key: "device",
486 Value: device,
487 }
488 args[1] = &kafka.KVArg{
489 Key: "flow_changes",
490 Value: flowChanges,
491 }
492 args[2] = &kafka.KVArg{
493 Key: "group_changes",
494 Value: groupChanges,
495 }
496
Manikkaraj kb1a10922019-07-29 12:10:34 -0400497 args[3] = &kafka.KVArg{
498 Key: "flow_metadata",
499 Value: flowMetadata,
500 }
khenaidoo43c82122018-11-22 18:38:28 -0500501 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500502 replyToTopic := ap.getCoreTopic()
npujar467fe752020-01-16 20:17:45 +0530503 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400504 log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
505 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400506}
507
npujar1d86a522019-11-14 17:11:16 +0530508// UpdatePmConfigs invokes update pm configs rpc
khenaidoob3127472019-07-24 21:04:55 -0400509func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
510 log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id})
511 toTopic := ap.getAdapterTopic(device.Adapter)
512 rpc := "Update_pm_config"
513 args := make([]*kafka.KVArg, 2)
514 args[0] = &kafka.KVArg{
515 Key: "device",
516 Value: device,
517 }
518 args[1] = &kafka.KVArg{
519 Key: "pm_configs",
520 Value: pmConfigs,
521 }
522
523 replyToTopic := ap.getCoreTopic()
npujar467fe752020-01-16 20:17:45 +0530524 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoob3127472019-07-24 21:04:55 -0400525 log.Debugw("UpdatePmConfigs-response", log.Fields{"deviceid": device.Id, "success": success})
526 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400527}
528
npujar1d86a522019-11-14 17:11:16 +0530529// ReceivePacketOut - TODO
530func (ap *AdapterProxy) ReceivePacketOut(deviceID voltha.ID, egressPortNo int, msg interface{}) error {
khenaidoob9203542018-09-17 22:56:37 -0400531 log.Debug("ReceivePacketOut")
532 return nil
533}
534
Devmalya Paulc594bb32019-11-06 07:34:27 +0000535func (ap *AdapterProxy) SuppressEvent(filter *voltha.EventFilter) error {
536 log.Debug("SuppressEvent")
khenaidoob9203542018-09-17 22:56:37 -0400537 return nil
538}
539
Devmalya Paulc594bb32019-11-06 07:34:27 +0000540func (ap *AdapterProxy) UnSuppressEvent(filter *voltha.EventFilter) error {
541 log.Debug("UnSuppressEvent")
khenaidoob9203542018-09-17 22:56:37 -0400542 return nil
khenaidoo89b0e942018-10-21 21:11:33 -0400543}
serkant.uluderya334479d2019-04-10 08:26:15 -0700544
npujar1d86a522019-11-14 17:11:16 +0530545// SimulateAlarm invokes simulate alarm rpc
serkant.uluderya334479d2019-04-10 08:26:15 -0700546func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulatereq *voltha.SimulateAlarmRequest) error {
547 log.Debugw("SimulateAlarm", log.Fields{"id": simulatereq.Id})
548 rpc := "simulate_alarm"
549 toTopic := ap.getAdapterTopic(device.Adapter)
550 args := make([]*kafka.KVArg, 2)
551 args[0] = &kafka.KVArg{
552 Key: "device",
553 Value: device,
554 }
555 args[1] = &kafka.KVArg{
556 Key: "request",
557 Value: simulatereq,
558 }
559
560 // Use a device topic for the response as we are the only core handling requests for this device
561 replyToTopic := ap.getCoreTopic()
562 ap.deviceTopicRegistered = true
563 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
564 log.Debugw("SimulateAlarm-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
565 return unPackResponse(rpc, device.Id, success, result)
566}
kesavandbc2d1622020-01-21 00:42:01 -0500567
568func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
569 log.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
570 rpc := "disable_port"
571 deviceID := &ic.StrType{Val: device.Id}
572 toTopic := ap.getAdapterTopic(device.Adapter)
573 // Use a device specific topic to send the request. The adapter handling the device creates a device
574 // specific topic
575 args := make([]*kafka.KVArg, 2)
576 args[0] = &kafka.KVArg{
577 Key: "deviceId",
578 Value: deviceID,
579 }
580
581 args[1] = &kafka.KVArg{
582 Key: "port",
583 Value: port,
584 }
585
586 replyToTopic := ap.getCoreTopic()
587 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
588 log.Debugw("disablePort-response", log.Fields{"device-id": device.Id, "port-no": port.PortNo, "success": success})
589 return unPackResponse(rpc, device.Id, success, result)
590}
591
592func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
593 log.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
594 rpc := "enable_port"
595 deviceID := &ic.StrType{Val: device.Id}
596 toTopic := ap.getAdapterTopic(device.Adapter)
597 // Use a device specific topic to send the request. The adapter handling the device creates a device
598 // specific topic
599 args := make([]*kafka.KVArg, 2)
600 args[0] = &kafka.KVArg{
601 Key: "deviceId",
602 Value: deviceID,
603 }
604
605 args[1] = &kafka.KVArg{
606 Key: "port",
607 Value: port,
608 }
609
610 replyToTopic := ap.getCoreTopic()
611 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
612 log.Debugw("enablePort-response", log.Fields{"device-id": device.Id, "port-no": port.PortNo, "success": success})
613 return unPackResponse(rpc, device.Id, success, result)
614}