blob: 816c179bbc282d99f88d670ed58ddb422d97b60e [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidoo92e62c52018-10-03 14:02:54 -040021 a "github.com/golang/protobuf/ptypes/any"
khenaidoob9203542018-09-17 22:56:37 -040022 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050024 ic "github.com/opencord/voltha-protos/go/inter_container"
25 "github.com/opencord/voltha-protos/go/openflow_13"
26 "github.com/opencord/voltha-protos/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040027 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29)
30
31type AdapterProxy struct {
serkant.uluderya334479d2019-04-10 08:26:15 -070032 TestMode bool
khenaidoo54e0ddf2019-02-27 16:21:33 -050033 deviceTopicRegistered bool
Kent Hagermana6d0c362019-07-30 12:50:21 -040034 corePairTopic string
serkant.uluderya334479d2019-04-10 08:26:15 -070035 kafkaICProxy *kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040036}
37
Kent Hagermana6d0c362019-07-30 12:50:21 -040038func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
39 return &AdapterProxy{
40 kafkaICProxy: kafkaProxy,
41 corePairTopic: corePairTopic,
42 deviceTopicRegistered: false,
43 }
khenaidoob9203542018-09-17 22:56:37 -040044}
45
khenaidoo92e62c52018-10-03 14:02:54 -040046func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
47 if success {
48 return nil
49 } else {
khenaidoo79232702018-12-04 11:00:41 -050050 unpackResult := &ic.Error{}
khenaidoo92e62c52018-10-03 14:02:54 -040051 var err error
52 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
53 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidoo6d055132019-02-12 16:51:19 -050054 return err
khenaidoo92e62c52018-10-03 14:02:54 -040055 }
56 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
57 // TODO: Need to get the real error code
58 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
59 }
60}
61
serkant.uluderya334479d2019-04-10 08:26:15 -070062func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
Kent Hagermana6d0c362019-07-30 12:50:21 -040063 return kafka.Topic{Name: ap.corePairTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050064}
65
serkant.uluderya334479d2019-04-10 08:26:15 -070066func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
khenaidoo54e0ddf2019-02-27 16:21:33 -050067 return kafka.Topic{Name: adapterName}
68}
69
khenaidoob9203542018-09-17 22:56:37 -040070func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
71 log.Debugw("AdoptDevice", log.Fields{"device": device})
khenaidoo92e62c52018-10-03 14:02:54 -040072 rpc := "adopt_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -050073 toTopic := ap.getAdapterTopic(device.Adapter)
74 //topic := kafka.Topic{Name: device.Adapter}
khenaidoob9203542018-09-17 22:56:37 -040075 args := make([]*kafka.KVArg, 1)
76 args[0] = &kafka.KVArg{
77 Key: "device",
78 Value: device,
79 }
khenaidoo43c82122018-11-22 18:38:28 -050080 // Use a device topic for the response as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -050081 replyToTopic := ap.getCoreTopic()
khenaidoo54e0ddf2019-02-27 16:21:33 -050082 ap.deviceTopicRegistered = true
khenaidoobdcb8e02019-03-06 16:28:56 -050083 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo43c82122018-11-22 18:38:28 -050084 log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
khenaidoo92e62c52018-10-03 14:02:54 -040085 return unPackResponse(rpc, device.Id, success, result)
86}
87
88func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
89 log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
90 rpc := "disable_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -050091 toTopic := ap.getAdapterTopic(device.Adapter)
92
khenaidoo43c82122018-11-22 18:38:28 -050093 // Use a device specific topic to send the request. The adapter handling the device creates a device
94 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -050095 //toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo92e62c52018-10-03 14:02:54 -040096 args := make([]*kafka.KVArg, 1)
97 args[0] = &kafka.KVArg{
98 Key: "device",
99 Value: device,
khenaidoob9203542018-09-17 22:56:37 -0400100 }
khenaidoo43c82122018-11-22 18:38:28 -0500101 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500102 //replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
103 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500104 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo92e62c52018-10-03 14:02:54 -0400105 log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
106 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400107}
108
khenaidoo4d4802d2018-10-04 21:59:49 -0400109func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
110 log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
111 rpc := "reenable_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500112 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400113 args := make([]*kafka.KVArg, 1)
114 args[0] = &kafka.KVArg{
115 Key: "device",
116 Value: device,
117 }
khenaidoo43c82122018-11-22 18:38:28 -0500118 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500119 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500120 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400121 log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
122 return unPackResponse(rpc, device.Id, success, result)
123}
124
125func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
126 log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
127 rpc := "reboot_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500128 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400129 args := make([]*kafka.KVArg, 1)
130 args[0] = &kafka.KVArg{
131 Key: "device",
132 Value: device,
133 }
khenaidoo43c82122018-11-22 18:38:28 -0500134 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500135 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500136 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400137 log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
138 return unPackResponse(rpc, device.Id, success, result)
139}
140
141func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
142 log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
143 rpc := "delete_device"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500144 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400145 args := make([]*kafka.KVArg, 1)
146 args[0] = &kafka.KVArg{
147 Key: "device",
148 Value: device,
149 }
khenaidoo43c82122018-11-22 18:38:28 -0500150 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500151 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500152 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400153 log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
khenaidoo43c82122018-11-22 18:38:28 -0500154
khenaidoo4d4802d2018-10-04 21:59:49 -0400155 return unPackResponse(rpc, device.Id, success, result)
156}
157
khenaidoo79232702018-12-04 11:00:41 -0500158func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400159 log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500160 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400161 args := make([]*kafka.KVArg, 1)
162 args[0] = &kafka.KVArg{
163 Key: "device",
164 Value: device,
165 }
khenaidoo43c82122018-11-22 18:38:28 -0500166 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500167 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500168 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400169 log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
170 if success {
khenaidoo79232702018-12-04 11:00:41 -0500171 unpackResult := &ic.SwitchCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400172 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
173 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
174 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
175 }
176 return unpackResult, nil
177 } else {
khenaidoo79232702018-12-04 11:00:41 -0500178 unpackResult := &ic.Error{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400179 var err error
180 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
181 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
182 }
183 log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
184 // TODO: Need to get the real error code
185 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
186 }
187}
188
khenaidoo79232702018-12-04 11:00:41 -0500189func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400190 log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500191 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo4d4802d2018-10-04 21:59:49 -0400192 args := make([]*kafka.KVArg, 2)
193 args[0] = &kafka.KVArg{
194 Key: "device",
195 Value: device,
196 }
khenaidoo79232702018-12-04 11:00:41 -0500197 pNo := &ic.IntType{Val: int64(portNo)}
khenaidoo4d4802d2018-10-04 21:59:49 -0400198 args[1] = &kafka.KVArg{
199 Key: "port_no",
200 Value: pNo,
201 }
khenaidoo43c82122018-11-22 18:38:28 -0500202 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500203 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500204 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400205 log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
206 if success {
khenaidoo79232702018-12-04 11:00:41 -0500207 unpackResult := &ic.PortCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400208 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
209 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
210 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
211 }
212 return unpackResult, nil
213 } else {
khenaidoo79232702018-12-04 11:00:41 -0500214 unpackResult := &ic.Error{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400215 var err error
216 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
217 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
218 }
219 log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
220 // TODO: Need to get the real error code
221 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
222 }
223}
224
225//TODO: Implement the functions below
226
khenaidoob9203542018-09-17 22:56:37 -0400227func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
228 log.Debug("AdapterDescriptor")
229 return nil, nil
230}
231
232func (ap *AdapterProxy) DeviceTypes() (*voltha.DeviceType, error) {
233 log.Debug("DeviceTypes")
234 return nil, nil
235}
236
237func (ap *AdapterProxy) Health() (*voltha.HealthStatus, error) {
238 log.Debug("Health")
239 return nil, nil
240}
241
khenaidoo92e62c52018-10-03 14:02:54 -0400242func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
khenaidoob9203542018-09-17 22:56:37 -0400243 log.Debug("ReconcileDevice")
244 return nil
245}
246
247func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
248 log.Debug("AbandonDevice")
249 return nil
250}
251
khenaidoob9203542018-09-17 22:56:37 -0400252func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
253 log.Debug("GetDeviceDetails")
254 return nil, nil
255}
256
khenaidoof5a5bfa2019-01-23 22:20:29 -0500257func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
258 log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
259 rpc := "download_image"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500260 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500261 args := make([]*kafka.KVArg, 2)
262 args[0] = &kafka.KVArg{
263 Key: "device",
264 Value: device,
265 }
266 args[1] = &kafka.KVArg{
267 Key: "request",
268 Value: download,
269 }
270 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500271 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500272 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500273 log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
274
275 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400276}
277
khenaidoof5a5bfa2019-01-23 22:20:29 -0500278func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
279 log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
280 rpc := "get_image_download_status"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500281 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500282 args := make([]*kafka.KVArg, 2)
283 args[0] = &kafka.KVArg{
284 Key: "device",
285 Value: device,
286 }
287 args[1] = &kafka.KVArg{
288 Key: "request",
289 Value: download,
290 }
291 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500292 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500293 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500294 log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
295
296 if success {
297 unpackResult := &voltha.ImageDownload{}
298 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
299 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
300 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
301 }
302 return unpackResult, nil
303 } else {
304 unpackResult := &ic.Error{}
305 var err error
306 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
307 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
308 return nil, err
309 }
310 log.Debugw("GetImageDownloadStatus-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
311 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
312 }
khenaidoob9203542018-09-17 22:56:37 -0400313}
314
khenaidoof5a5bfa2019-01-23 22:20:29 -0500315func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
316 log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
317 rpc := "cancel_image_download"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500318 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500319 args := make([]*kafka.KVArg, 2)
320 args[0] = &kafka.KVArg{
321 Key: "device",
322 Value: device,
323 }
324 args[1] = &kafka.KVArg{
325 Key: "request",
326 Value: download,
327 }
328 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500329 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500330 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500331 log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
332
333 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400334}
335
khenaidoof5a5bfa2019-01-23 22:20:29 -0500336func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
337 log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
338 rpc := "activate_image_update"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500339 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500340 args := make([]*kafka.KVArg, 2)
341 args[0] = &kafka.KVArg{
342 Key: "device",
343 Value: device,
344 }
345 args[1] = &kafka.KVArg{
346 Key: "request",
347 Value: download,
348 }
349 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500350 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500351 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500352 log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
353
354 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400355}
356
khenaidoof5a5bfa2019-01-23 22:20:29 -0500357func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
358 log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
359 rpc := "revert_image_update"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500360 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500361 args := make([]*kafka.KVArg, 2)
362 args[0] = &kafka.KVArg{
363 Key: "device",
364 Value: device,
365 }
366 args[1] = &kafka.KVArg{
367 Key: "request",
368 Value: download,
369 }
370 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500371 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500372 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500373 log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
374
375 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400376}
377
378func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
379 log.Debug("SelfTestDevice")
380 return nil, nil
381}
382
khenaidoofdbad6e2018-11-06 22:26:38 -0500383func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
384 log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500385 toTopic := ap.getAdapterTopic(deviceType)
khenaidoofdbad6e2018-11-06 22:26:38 -0500386 rpc := "receive_packet_out"
khenaidoo79232702018-12-04 11:00:41 -0500387 dId := &ic.StrType{Val: deviceId}
khenaidoofdbad6e2018-11-06 22:26:38 -0500388 args := make([]*kafka.KVArg, 3)
389 args[0] = &kafka.KVArg{
390 Key: "deviceId",
391 Value: dId,
392 }
khenaidoo79232702018-12-04 11:00:41 -0500393 op := &ic.IntType{Val: int64(outPort)}
khenaidoofdbad6e2018-11-06 22:26:38 -0500394 args[1] = &kafka.KVArg{
395 Key: "outPort",
396 Value: op,
397 }
398 args[2] = &kafka.KVArg{
399 Key: "packet",
400 Value: packet,
401 }
402
403 // TODO: Do we need to wait for an ACK on a packet Out?
khenaidoo43c82122018-11-22 18:38:28 -0500404 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500405 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500406 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500407 log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
408 return unPackResponse(rpc, deviceId, success, result)
409}
410
khenaidoo19d7b632018-10-30 10:49:50 -0400411func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
khenaidoo0458db62019-06-20 08:50:36 -0400412 log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flowsInUpdate": len(flows.Items), "groupsToUpdate": len(groups.Items)})
khenaidoo54e0ddf2019-02-27 16:21:33 -0500413 toTopic := ap.getAdapterTopic(device.Adapter)
khenaidoo19d7b632018-10-30 10:49:50 -0400414 rpc := "update_flows_bulk"
415 args := make([]*kafka.KVArg, 3)
416 args[0] = &kafka.KVArg{
417 Key: "device",
418 Value: device,
419 }
420 args[1] = &kafka.KVArg{
421 Key: "flows",
422 Value: flows,
423 }
424 args[2] = &kafka.KVArg{
425 Key: "groups",
426 Value: groups,
427 }
428
khenaidoo43c82122018-11-22 18:38:28 -0500429 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500430 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500431 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400432 log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
433 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400434}
435
khenaidoo19d7b632018-10-30 10:49:50 -0400436func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
khenaidoo0458db62019-06-20 08:50:36 -0400437 log.Debugw("UpdateFlowsIncremental",
438 log.Fields{
439 "deviceId": device.Id,
440 "flowsToAdd": len(flowChanges.ToAdd.Items),
441 "flowsToDelete": len(flowChanges.ToRemove.Items),
442 "groupsToAdd": len(groupChanges.ToAdd.Items),
443 "groupsToDelete": len(groupChanges.ToRemove.Items),
444 "groupsToUpdate": len(groupChanges.ToUpdate.Items),
445 })
khenaidoo54e0ddf2019-02-27 16:21:33 -0500446 toTopic := ap.getAdapterTopic(device.Adapter)
Matt Jeanneretb0037422019-03-23 14:36:51 -0400447 rpc := "update_flows_incrementally"
khenaidoo19d7b632018-10-30 10:49:50 -0400448 args := make([]*kafka.KVArg, 3)
449 args[0] = &kafka.KVArg{
450 Key: "device",
451 Value: device,
452 }
453 args[1] = &kafka.KVArg{
454 Key: "flow_changes",
455 Value: flowChanges,
456 }
457 args[2] = &kafka.KVArg{
458 Key: "group_changes",
459 Value: groupChanges,
460 }
461
khenaidoo43c82122018-11-22 18:38:28 -0500462 // Use a device specific topic as we are the only core handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500463 replyToTopic := ap.getCoreTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500464 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400465 log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
466 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400467}
468
khenaidoob3127472019-07-24 21:04:55 -0400469func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
470 log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id})
471 toTopic := ap.getAdapterTopic(device.Adapter)
472 rpc := "Update_pm_config"
473 args := make([]*kafka.KVArg, 2)
474 args[0] = &kafka.KVArg{
475 Key: "device",
476 Value: device,
477 }
478 args[1] = &kafka.KVArg{
479 Key: "pm_configs",
480 Value: pmConfigs,
481 }
482
483 replyToTopic := ap.getCoreTopic()
484 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
485 log.Debugw("UpdatePmConfigs-response", log.Fields{"deviceid": device.Id, "success": success})
486 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400487}
488
489func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
490 log.Debug("ReceivePacketOut")
491 return nil
492}
493
494func (ap *AdapterProxy) SuppressAlarm(filter voltha.AlarmFilter) error {
495 log.Debug("SuppressAlarm")
496 return nil
497}
498
499func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
500 log.Debug("UnSuppressAlarm")
501 return nil
khenaidoo89b0e942018-10-21 21:11:33 -0400502}
serkant.uluderya334479d2019-04-10 08:26:15 -0700503
504func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulatereq *voltha.SimulateAlarmRequest) error {
505 log.Debugw("SimulateAlarm", log.Fields{"id": simulatereq.Id})
506 rpc := "simulate_alarm"
507 toTopic := ap.getAdapterTopic(device.Adapter)
508 args := make([]*kafka.KVArg, 2)
509 args[0] = &kafka.KVArg{
510 Key: "device",
511 Value: device,
512 }
513 args[1] = &kafka.KVArg{
514 Key: "request",
515 Value: simulatereq,
516 }
517
518 // Use a device topic for the response as we are the only core handling requests for this device
519 replyToTopic := ap.getCoreTopic()
520 ap.deviceTopicRegistered = true
521 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
522 log.Debugw("SimulateAlarm-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
523 return unPackResponse(rpc, device.Id, success, result)
524}