blob: 41f71a653cb69ccde60df1563079f351c0ba7fcc [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidoo92e62c52018-10-03 14:02:54 -040021 a "github.com/golang/protobuf/ptypes/any"
khenaidoob9203542018-09-17 22:56:37 -040022 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050024 ic "github.com/opencord/voltha-protos/go/inter_container"
25 "github.com/opencord/voltha-protos/go/openflow_13"
26 "github.com/opencord/voltha-protos/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29)
30
31type AdapterProxy struct {
serkant.uluderya334479d2019-04-10 08:26:15 -070032 TestMode bool
khenaidoo54e0ddf2019-02-27 16:21:33 -050033 deviceTopicRegistered bool
Kent Hagermana6d0c362019-07-30 12:50:21 -040034 corePairTopic string
serkant.uluderya334479d2019-04-10 08:26:15 -070035 kafkaICProxy *kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040036}
37
Kent Hagermana6d0c362019-07-30 12:50:21 -040038func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
39 return &AdapterProxy{
40 kafkaICProxy: kafkaProxy,
41 corePairTopic: corePairTopic,
42 deviceTopicRegistered: false,
43 }
khenaidoob9203542018-09-17 22:56:37 -040044}
45
khenaidoo92e62c52018-10-03 14:02:54 -040046func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
47 if success {
48 return nil
49 } else {
khenaidoo79232702018-12-04 11:00:41 -050050 unpackResult := &ic.Error{}
khenaidoo92e62c52018-10-03 14:02:54 -040051 var err error
52 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
53 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidoo6d055132019-02-12 16:51:19 -050054 return err
khenaidoo92e62c52018-10-03 14:02:54 -040055 }
56 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
57 // TODO: Need to get the real error code
58 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
59 }
60}
61
serkant.uluderya334479d2019-04-10 08:26:15 -070062func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
Kent Hagermana6d0c362019-07-30 12:50:21 -040063 return kafka.Topic{Name: ap.corePairTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050064}
65
serkant.uluderya334479d2019-04-10 08:26:15 -070066func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
khenaidoo54e0ddf2019-02-27 16:21:33 -050067 return kafka.Topic{Name: adapterName}
68}
69
khenaidoob9203542018-09-17 22:56:37 -040070func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
71 log.Debugw("AdoptDevice", log.Fields{"device": device})
khenaidoo92e62c52018-10-03 14:02:54 -040072 rpc := "adopt_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -050073 toTopic := ap.getAdapterTopic(device.Adapter)
74 //topic := kafka.Topic{Name: device.Adapter}
khenaidoob9203542018-09-17 22:56:37 -040075 args := make([]*kafka.KVArg, 1)
76 args[0] = &kafka.KVArg{
77 Key: "device",
78 Value: device,
79 }
khenaidoo43c82122018-11-22 18:38:28 -050080 // Use a device topic for the response as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -050081 replyToTopic := ap.getCoreTopic()
khenaidoo54e0ddf2019-02-27 16:21:33 -050082 ap.deviceTopicRegistered = true
khenaidoobdcb8e02019-03-06 16:28:56 -050083 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo43c82122018-11-22 18:38:28 -050084 log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
khenaidoo92e62c52018-10-03 14:02:54 -040085 return unPackResponse(rpc, device.Id, success, result)
86}
87
88func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
89 log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
90 rpc := "disable_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -050091 toTopic := ap.getAdapterTopic(device.Adapter)
92
khenaidoo43c82122018-11-22 18:38:28 -050093 // Use a device specific topic to send the request. The adapter handling the device creates a device
94 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -050095 //toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo92e62c52018-10-03 14:02:54 -040096 args := make([]*kafka.KVArg, 1)
97 args[0] = &kafka.KVArg{
98 Key: "device",
99 Value: device,
khenaidoob9203542018-09-17 22:56:37 -0400100 }
khenaidoo43c82122018-11-22 18:38:28 -0500101 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500102 //replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
103 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500104 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -0400105 log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
106 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400107}
108
khenaidoo4d4802d2018-10-04 21:59:49 -0400109func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
110 log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
111 rpc := "reenable_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500112 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400113 args := make([]*kafka.KVArg, 1)
114 args[0] = &kafka.KVArg{
115 Key: "device",
116 Value: device,
117 }
khenaidoo43c82122018-11-22 18:38:28 -0500118 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500119 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500120 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400121 log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
122 return unPackResponse(rpc, device.Id, success, result)
123}
124
125func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
126 log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
127 rpc := "reboot_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500128 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400129 args := make([]*kafka.KVArg, 1)
130 args[0] = &kafka.KVArg{
131 Key: "device",
132 Value: device,
133 }
khenaidoo43c82122018-11-22 18:38:28 -0500134 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500135 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500136 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400137 log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
138 return unPackResponse(rpc, device.Id, success, result)
139}
140
141func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
142 log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
143 rpc := "delete_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500144 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400145 args := make([]*kafka.KVArg, 1)
146 args[0] = &kafka.KVArg{
147 Key: "device",
148 Value: device,
149 }
khenaidoo43c82122018-11-22 18:38:28 -0500150 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500151 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500152 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400153 log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
khenaidoo43c82122018-11-22 18:38:28 -0500154
khenaidoo4d4802d2018-10-04 21:59:49 -0400155 return unPackResponse(rpc, device.Id, success, result)
156}
157
khenaidoo79232702018-12-04 11:00:41 -0500158func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400159 log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500160 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400161 args := make([]*kafka.KVArg, 1)
162 args[0] = &kafka.KVArg{
163 Key: "device",
164 Value: device,
165 }
khenaidoo43c82122018-11-22 18:38:28 -0500166 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500167 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500168 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400169 log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
170 if success {
khenaidoo79232702018-12-04 11:00:41 -0500171 unpackResult := &ic.SwitchCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400172 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
173 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
174 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
175 }
176 return unpackResult, nil
177 } else {
khenaidoo79232702018-12-04 11:00:41 -0500178 unpackResult := &ic.Error{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400179 var err error
180 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
181 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
182 }
183 log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
184 // TODO: Need to get the real error code
185 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
186 }
187}
188
khenaidoo79232702018-12-04 11:00:41 -0500189func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400190 log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500191 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400192 args := make([]*kafka.KVArg, 2)
193 args[0] = &kafka.KVArg{
194 Key: "device",
195 Value: device,
196 }
khenaidoo79232702018-12-04 11:00:41 -0500197 pNo := &ic.IntType{Val: int64(portNo)}
khenaidoo4d4802d2018-10-04 21:59:49 -0400198 args[1] = &kafka.KVArg{
199 Key: "port_no",
200 Value: pNo,
201 }
khenaidoo43c82122018-11-22 18:38:28 -0500202 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500203 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500204 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400205 log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
206 if success {
khenaidoo79232702018-12-04 11:00:41 -0500207 unpackResult := &ic.PortCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400208 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
209 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
210 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
211 }
212 return unpackResult, nil
213 } else {
khenaidoo79232702018-12-04 11:00:41 -0500214 unpackResult := &ic.Error{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400215 var err error
216 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
217 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
218 }
219 log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
220 // TODO: Need to get the real error code
221 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
222 }
223}
224
225//TODO: Implement the functions below
226
khenaidoob9203542018-09-17 22:56:37 -0400227func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
228 log.Debug("AdapterDescriptor")
229 return nil, nil
230}
231
232func (ap *AdapterProxy) DeviceTypes() (*voltha.DeviceType, error) {
233 log.Debug("DeviceTypes")
234 return nil, nil
235}
236
237func (ap *AdapterProxy) Health() (*voltha.HealthStatus, error) {
238 log.Debug("Health")
239 return nil, nil
240}
241
khenaidooba6b6c42019-08-02 09:11:56 -0400242func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) error {
243 log.Debugw("ReconcileDevice", log.Fields{"deviceId": device.Id})
244 rpc := "Reconcile_device"
245 toTopic := ap.getAdapterTopic(device.Adapter)
246 args := []*kafka.KVArg{
247 {Key: "device", Value: device},
248 }
249 // Use a device specific topic as we are the only core handling requests for this device
250 replyToTopic := ap.getCoreTopic()
251 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
252 log.Debugw("ReconcileDevice-response", log.Fields{"deviceid": device.Id, "success": success})
253
254 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400255}
256
257func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
258 log.Debug("AbandonDevice")
259 return nil
260}
261
khenaidoob9203542018-09-17 22:56:37 -0400262func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
263 log.Debug("GetDeviceDetails")
264 return nil, nil
265}
266
khenaidoof5a5bfa2019-01-23 22:20:29 -0500267func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
268 log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
269 rpc := "download_image"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500270 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500271 args := make([]*kafka.KVArg, 2)
272 args[0] = &kafka.KVArg{
273 Key: "device",
274 Value: device,
275 }
276 args[1] = &kafka.KVArg{
277 Key: "request",
278 Value: download,
279 }
280 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500281 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500282 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500283 log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
284
285 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400286}
287
khenaidoof5a5bfa2019-01-23 22:20:29 -0500288func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
289 log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
290 rpc := "get_image_download_status"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500291 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500292 args := make([]*kafka.KVArg, 2)
293 args[0] = &kafka.KVArg{
294 Key: "device",
295 Value: device,
296 }
297 args[1] = &kafka.KVArg{
298 Key: "request",
299 Value: download,
300 }
301 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500302 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500303 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500304 log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
305
306 if success {
307 unpackResult := &voltha.ImageDownload{}
308 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
309 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
310 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
311 }
312 return unpackResult, nil
313 } else {
314 unpackResult := &ic.Error{}
315 var err error
316 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
317 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
318 return nil, err
319 }
320 log.Debugw("GetImageDownloadStatus-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
321 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
322 }
khenaidoob9203542018-09-17 22:56:37 -0400323}
324
khenaidoof5a5bfa2019-01-23 22:20:29 -0500325func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
326 log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
327 rpc := "cancel_image_download"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500328 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500329 args := make([]*kafka.KVArg, 2)
330 args[0] = &kafka.KVArg{
331 Key: "device",
332 Value: device,
333 }
334 args[1] = &kafka.KVArg{
335 Key: "request",
336 Value: download,
337 }
338 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500339 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500340 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500341 log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
342
343 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400344}
345
khenaidoof5a5bfa2019-01-23 22:20:29 -0500346func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
347 log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
348 rpc := "activate_image_update"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500349 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500350 args := make([]*kafka.KVArg, 2)
351 args[0] = &kafka.KVArg{
352 Key: "device",
353 Value: device,
354 }
355 args[1] = &kafka.KVArg{
356 Key: "request",
357 Value: download,
358 }
359 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500360 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500361 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500362 log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
363
364 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400365}
366
khenaidoof5a5bfa2019-01-23 22:20:29 -0500367func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
368 log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
369 rpc := "revert_image_update"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500370 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500371 args := make([]*kafka.KVArg, 2)
372 args[0] = &kafka.KVArg{
373 Key: "device",
374 Value: device,
375 }
376 args[1] = &kafka.KVArg{
377 Key: "request",
378 Value: download,
379 }
380 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500381 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500382 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500383 log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
384
385 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400386}
387
388func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
389 log.Debug("SelfTestDevice")
390 return nil, nil
391}
392
khenaidoofdbad6e2018-11-06 22:26:38 -0500393func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
394 log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500395 toTopic := ap.getAdapterTopic(deviceType)
khenaidoofdbad6e2018-11-06 22:26:38 -0500396 rpc := "receive_packet_out"
khenaidoo79232702018-12-04 11:00:41 -0500397 dId := &ic.StrType{Val: deviceId}
khenaidoofdbad6e2018-11-06 22:26:38 -0500398 args := make([]*kafka.KVArg, 3)
399 args[0] = &kafka.KVArg{
400 Key: "deviceId",
401 Value: dId,
402 }
khenaidoo79232702018-12-04 11:00:41 -0500403 op := &ic.IntType{Val: int64(outPort)}
khenaidoofdbad6e2018-11-06 22:26:38 -0500404 args[1] = &kafka.KVArg{
405 Key: "outPort",
406 Value: op,
407 }
408 args[2] = &kafka.KVArg{
409 Key: "packet",
410 Value: packet,
411 }
412
413 // TODO: Do we need to wait for an ACK on a packet Out?
khenaidoo43c82122018-11-22 18:38:28 -0500414 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500415 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500416 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500417 log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
418 return unPackResponse(rpc, deviceId, success, result)
419}
420
khenaidoo19d7b632018-10-30 10:49:50 -0400421func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
khenaidoo0458db62019-06-20 08:50:36 -0400422 log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flowsInUpdate": len(flows.Items), "groupsToUpdate": len(groups.Items)})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500423 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo19d7b632018-10-30 10:49:50 -0400424 rpc := "update_flows_bulk"
425 args := make([]*kafka.KVArg, 3)
426 args[0] = &kafka.KVArg{
427 Key: "device",
428 Value: device,
429 }
430 args[1] = &kafka.KVArg{
431 Key: "flows",
432 Value: flows,
433 }
434 args[2] = &kafka.KVArg{
435 Key: "groups",
436 Value: groups,
437 }
438
khenaidoo43c82122018-11-22 18:38:28 -0500439 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500440 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500441 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400442 log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
443 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400444}
445
khenaidoo19d7b632018-10-30 10:49:50 -0400446func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
khenaidoo0458db62019-06-20 08:50:36 -0400447 log.Debugw("UpdateFlowsIncremental",
448 log.Fields{
449 "deviceId": device.Id,
450 "flowsToAdd": len(flowChanges.ToAdd.Items),
451 "flowsToDelete": len(flowChanges.ToRemove.Items),
452 "groupsToAdd": len(groupChanges.ToAdd.Items),
453 "groupsToDelete": len(groupChanges.ToRemove.Items),
454 "groupsToUpdate": len(groupChanges.ToUpdate.Items),
455 })
khenaidoo54e0ddf2019-02-27 16:21:33 -0500456 toTopic := ap.getAdapterTopic(device.Adapter)
Matt Jeanneretb0037422019-03-23 14:36:51 -0400457 rpc := "update_flows_incrementally"
khenaidoo19d7b632018-10-30 10:49:50 -0400458 args := make([]*kafka.KVArg, 3)
459 args[0] = &kafka.KVArg{
460 Key: "device",
461 Value: device,
462 }
463 args[1] = &kafka.KVArg{
464 Key: "flow_changes",
465 Value: flowChanges,
466 }
467 args[2] = &kafka.KVArg{
468 Key: "group_changes",
469 Value: groupChanges,
470 }
471
khenaidoo43c82122018-11-22 18:38:28 -0500472 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500473 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500474 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400475 log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
476 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400477}
478
khenaidoob3127472019-07-24 21:04:55 -0400479func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
480 log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id})
481 toTopic := ap.getAdapterTopic(device.Adapter)
482 rpc := "Update_pm_config"
483 args := make([]*kafka.KVArg, 2)
484 args[0] = &kafka.KVArg{
485 Key: "device",
486 Value: device,
487 }
488 args[1] = &kafka.KVArg{
489 Key: "pm_configs",
490 Value: pmConfigs,
491 }
492
493 replyToTopic := ap.getCoreTopic()
494 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
495 log.Debugw("UpdatePmConfigs-response", log.Fields{"deviceid": device.Id, "success": success})
496 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400497}
498
499func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
500 log.Debug("ReceivePacketOut")
501 return nil
502}
503
504func (ap *AdapterProxy) SuppressAlarm(filter voltha.AlarmFilter) error {
505 log.Debug("SuppressAlarm")
506 return nil
507}
508
509func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
510 log.Debug("UnSuppressAlarm")
511 return nil
khenaidoo89b0e942018-10-21 21:11:33 -0400512}
serkant.uluderya334479d2019-04-10 08:26:15 -0700513
514func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulatereq *voltha.SimulateAlarmRequest) error {
515 log.Debugw("SimulateAlarm", log.Fields{"id": simulatereq.Id})
516 rpc := "simulate_alarm"
517 toTopic := ap.getAdapterTopic(device.Adapter)
518 args := make([]*kafka.KVArg, 2)
519 args[0] = &kafka.KVArg{
520 Key: "device",
521 Value: device,
522 }
523 args[1] = &kafka.KVArg{
524 Key: "request",
525 Value: simulatereq,
526 }
527
528 // Use a device topic for the response as we are the only core handling requests for this device
529 replyToTopic := ap.getCoreTopic()
530 ap.deviceTopicRegistered = true
531 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
532 log.Debugw("SimulateAlarm-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
533 return unPackResponse(rpc, device.Id, success, result)
534}