blob: d21cfdb6f7fd0fd964f8f0a2ce0cdd43b3f890bb [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidoo92e62c52018-10-03 14:02:54 -040021 a "github.com/golang/protobuf/ptypes/any"
khenaidoob9203542018-09-17 22:56:37 -040022 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
khenaidoo79232702018-12-04 11:00:41 -050024 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidoo19d7b632018-10-30 10:49:50 -040025 "github.com/opencord/voltha-go/protos/openflow_13"
khenaidoob9203542018-09-17 22:56:37 -040026 "github.com/opencord/voltha-go/protos/voltha"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29)
30
31type AdapterProxy struct {
khenaidoo43c82122018-11-22 18:38:28 -050032 TestMode bool
33 kafkaICProxy *kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040034}
35
khenaidoo43c82122018-11-22 18:38:28 -050036func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
khenaidoob9203542018-09-17 22:56:37 -040037 var proxy AdapterProxy
khenaidoo43c82122018-11-22 18:38:28 -050038 proxy.kafkaICProxy = kafkaProxy
khenaidoob9203542018-09-17 22:56:37 -040039 return &proxy
40}
41
khenaidoo92e62c52018-10-03 14:02:54 -040042func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
43 if success {
44 return nil
45 } else {
khenaidoo79232702018-12-04 11:00:41 -050046 unpackResult := &ic.Error{}
khenaidoo92e62c52018-10-03 14:02:54 -040047 var err error
48 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
49 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
50 }
51 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
52 // TODO: Need to get the real error code
53 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
54 }
55}
56
khenaidoo43c82122018-11-22 18:38:28 -050057//func kafka.CreateSubTopic(args ...string) kafka.Topic{
58// topic := ""
59// for index , arg := range args {
60// if index == 0 {
61// topic = arg
62// } else {
63// topic = fmt.Sprintf("%s_%s", topic, arg)
64// }
65// }
66// return kafka.Topic{Name:topic}
67//}
68
khenaidoob9203542018-09-17 22:56:37 -040069func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
70 log.Debugw("AdoptDevice", log.Fields{"device": device})
khenaidoo92e62c52018-10-03 14:02:54 -040071 rpc := "adopt_device"
khenaidoob9203542018-09-17 22:56:37 -040072 topic := kafka.Topic{Name: device.Type}
73 args := make([]*kafka.KVArg, 1)
74 args[0] = &kafka.KVArg{
75 Key: "device",
76 Value: device,
77 }
khenaidoo43c82122018-11-22 18:38:28 -050078 // Use a device topic for the response as we are the only core handling requests for this device
79 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
khenaidoo90847922018-12-03 14:47:51 -050080 if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
81 log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
82 return err
83 }
khenaidoo43c82122018-11-22 18:38:28 -050084 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
85 log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
khenaidoo90847922018-12-03 14:47:51 -050086 //if success {
87 // // From now on, any unsolicited requests from the adapters for this device will come over the device topic.
88 // // We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
89 // if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
90 // log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
91 // return err
92 // }
93 //}
khenaidoo92e62c52018-10-03 14:02:54 -040094 return unPackResponse(rpc, device.Id, success, result)
95}
96
97func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
98 log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
99 rpc := "disable_device"
khenaidoo43c82122018-11-22 18:38:28 -0500100 // Use a device specific topic to send the request. The adapter handling the device creates a device
101 // specific topic
102 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo92e62c52018-10-03 14:02:54 -0400103 args := make([]*kafka.KVArg, 1)
104 args[0] = &kafka.KVArg{
105 Key: "device",
106 Value: device,
khenaidoob9203542018-09-17 22:56:37 -0400107 }
khenaidoo43c82122018-11-22 18:38:28 -0500108 // Use a device specific topic as we are the only core handling requests for this device
109 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
110 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo92e62c52018-10-03 14:02:54 -0400111 log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
112 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400113}
114
khenaidoo4d4802d2018-10-04 21:59:49 -0400115func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
116 log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
117 rpc := "reenable_device"
khenaidoo43c82122018-11-22 18:38:28 -0500118 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400119 args := make([]*kafka.KVArg, 1)
120 args[0] = &kafka.KVArg{
121 Key: "device",
122 Value: device,
123 }
khenaidoo43c82122018-11-22 18:38:28 -0500124 // Use a device specific topic as we are the only core handling requests for this device
125 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
126 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400127 log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
128 return unPackResponse(rpc, device.Id, success, result)
129}
130
131func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
132 log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
133 rpc := "reboot_device"
khenaidoo43c82122018-11-22 18:38:28 -0500134 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400135 args := make([]*kafka.KVArg, 1)
136 args[0] = &kafka.KVArg{
137 Key: "device",
138 Value: device,
139 }
khenaidoo43c82122018-11-22 18:38:28 -0500140 // Use a device specific topic as we are the only core handling requests for this device
141 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
142 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400143 log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
144 return unPackResponse(rpc, device.Id, success, result)
145}
146
147func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
148 log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
149 rpc := "delete_device"
khenaidoo43c82122018-11-22 18:38:28 -0500150 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400151 args := make([]*kafka.KVArg, 1)
152 args[0] = &kafka.KVArg{
153 Key: "device",
154 Value: device,
155 }
khenaidoo43c82122018-11-22 18:38:28 -0500156 // Use a device specific topic as we are the only core handling requests for this device
157 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
158 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400159 log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
khenaidoo43c82122018-11-22 18:38:28 -0500160
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500161 // We no longer need to have this device topic as we won't receive any unsolicited messages on it
162 if err := ap.kafkaICProxy.DeleteTopic(replyToTopic); err != nil {
163 log.Errorw("Unable-to-delete-topic", log.Fields{"topic": replyToTopic, "error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500164 return err
165 }
khenaidoo43c82122018-11-22 18:38:28 -0500166
khenaidoo4d4802d2018-10-04 21:59:49 -0400167 return unPackResponse(rpc, device.Id, success, result)
168}
169
khenaidoo79232702018-12-04 11:00:41 -0500170func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400171 log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
khenaidoo43c82122018-11-22 18:38:28 -0500172 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400173 args := make([]*kafka.KVArg, 1)
174 args[0] = &kafka.KVArg{
175 Key: "device",
176 Value: device,
177 }
khenaidoo43c82122018-11-22 18:38:28 -0500178 // Use a device specific topic as we are the only core handling requests for this device
179 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
180 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400181 log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
182 if success {
khenaidoo79232702018-12-04 11:00:41 -0500183 unpackResult := &ic.SwitchCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400184 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
185 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
186 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
187 }
188 return unpackResult, nil
189 } else {
khenaidoo79232702018-12-04 11:00:41 -0500190 unpackResult := &ic.Error{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400191 var err error
192 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
193 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
194 }
195 log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
196 // TODO: Need to get the real error code
197 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
198 }
199}
200
khenaidoo79232702018-12-04 11:00:41 -0500201func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400202 log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
khenaidoo43c82122018-11-22 18:38:28 -0500203 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400204 args := make([]*kafka.KVArg, 2)
205 args[0] = &kafka.KVArg{
206 Key: "device",
207 Value: device,
208 }
khenaidoo79232702018-12-04 11:00:41 -0500209 pNo := &ic.IntType{Val: int64(portNo)}
khenaidoo4d4802d2018-10-04 21:59:49 -0400210 args[1] = &kafka.KVArg{
211 Key: "port_no",
212 Value: pNo,
213 }
khenaidoo43c82122018-11-22 18:38:28 -0500214 // Use a device specific topic as we are the only core handling requests for this device
215 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
216 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400217 log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
218 if success {
khenaidoo79232702018-12-04 11:00:41 -0500219 unpackResult := &ic.PortCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400220 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
221 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
222 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
223 }
224 return unpackResult, nil
225 } else {
khenaidoo79232702018-12-04 11:00:41 -0500226 unpackResult := &ic.Error{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400227 var err error
228 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
229 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
230 }
231 log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
232 // TODO: Need to get the real error code
233 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
234 }
235}
236
237//TODO: Implement the functions below
238
khenaidoob9203542018-09-17 22:56:37 -0400239func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
240 log.Debug("AdapterDescriptor")
241 return nil, nil
242}
243
244func (ap *AdapterProxy) DeviceTypes() (*voltha.DeviceType, error) {
245 log.Debug("DeviceTypes")
246 return nil, nil
247}
248
249func (ap *AdapterProxy) Health() (*voltha.HealthStatus, error) {
250 log.Debug("Health")
251 return nil, nil
252}
253
khenaidoo92e62c52018-10-03 14:02:54 -0400254func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
khenaidoob9203542018-09-17 22:56:37 -0400255 log.Debug("ReconcileDevice")
256 return nil
257}
258
259func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
260 log.Debug("AbandonDevice")
261 return nil
262}
263
khenaidoob9203542018-09-17 22:56:37 -0400264func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
265 log.Debug("GetDeviceDetails")
266 return nil, nil
267}
268
khenaidoof5a5bfa2019-01-23 22:20:29 -0500269func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
270 log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
271 rpc := "download_image"
272 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
273 args := make([]*kafka.KVArg, 2)
274 args[0] = &kafka.KVArg{
275 Key: "device",
276 Value: device,
277 }
278 args[1] = &kafka.KVArg{
279 Key: "request",
280 Value: download,
281 }
282 // Use a device specific topic as we are the only core handling requests for this device
283 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
284 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
285 log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
286
287 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400288}
289
khenaidoof5a5bfa2019-01-23 22:20:29 -0500290func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
291 log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
292 rpc := "get_image_download_status"
293 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
294 args := make([]*kafka.KVArg, 2)
295 args[0] = &kafka.KVArg{
296 Key: "device",
297 Value: device,
298 }
299 args[1] = &kafka.KVArg{
300 Key: "request",
301 Value: download,
302 }
303 // Use a device specific topic as we are the only core handling requests for this device
304 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
305 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
306 log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
307
308 if success {
309 unpackResult := &voltha.ImageDownload{}
310 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
311 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
312 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
313 }
314 return unpackResult, nil
315 } else {
316 unpackResult := &ic.Error{}
317 var err error
318 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
319 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
320 return nil, err
321 }
322 log.Debugw("GetImageDownloadStatus-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
323 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
324 }
khenaidoob9203542018-09-17 22:56:37 -0400325}
326
khenaidoof5a5bfa2019-01-23 22:20:29 -0500327func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
328 log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
329 rpc := "cancel_image_download"
330 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
331 args := make([]*kafka.KVArg, 2)
332 args[0] = &kafka.KVArg{
333 Key: "device",
334 Value: device,
335 }
336 args[1] = &kafka.KVArg{
337 Key: "request",
338 Value: download,
339 }
340 // Use a device specific topic as we are the only core handling requests for this device
341 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
342 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
343 log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
344
345 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400346}
347
khenaidoof5a5bfa2019-01-23 22:20:29 -0500348func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
349 log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
350 rpc := "activate_image_update"
351 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
352 args := make([]*kafka.KVArg, 2)
353 args[0] = &kafka.KVArg{
354 Key: "device",
355 Value: device,
356 }
357 args[1] = &kafka.KVArg{
358 Key: "request",
359 Value: download,
360 }
361 // Use a device specific topic as we are the only core handling requests for this device
362 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
363 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
364 log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
365
366 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400367}
368
khenaidoof5a5bfa2019-01-23 22:20:29 -0500369func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
370 log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
371 rpc := "revert_image_update"
372 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
373 args := make([]*kafka.KVArg, 2)
374 args[0] = &kafka.KVArg{
375 Key: "device",
376 Value: device,
377 }
378 args[1] = &kafka.KVArg{
379 Key: "request",
380 Value: download,
381 }
382 // Use a device specific topic as we are the only core handling requests for this device
383 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
384 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
385 log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
386
387 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400388}
389
390func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
391 log.Debug("SelfTestDevice")
392 return nil, nil
393}
394
khenaidoofdbad6e2018-11-06 22:26:38 -0500395func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
396 log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
khenaidoo43c82122018-11-22 18:38:28 -0500397 toTopic := kafka.CreateSubTopic(deviceType, deviceId)
khenaidoofdbad6e2018-11-06 22:26:38 -0500398 rpc := "receive_packet_out"
khenaidoo79232702018-12-04 11:00:41 -0500399 dId := &ic.StrType{Val: deviceId}
khenaidoofdbad6e2018-11-06 22:26:38 -0500400 args := make([]*kafka.KVArg, 3)
401 args[0] = &kafka.KVArg{
402 Key: "deviceId",
403 Value: dId,
404 }
khenaidoo79232702018-12-04 11:00:41 -0500405 op := &ic.IntType{Val: int64(outPort)}
khenaidoofdbad6e2018-11-06 22:26:38 -0500406 args[1] = &kafka.KVArg{
407 Key: "outPort",
408 Value: op,
409 }
410 args[2] = &kafka.KVArg{
411 Key: "packet",
412 Value: packet,
413 }
414
415 // TODO: Do we need to wait for an ACK on a packet Out?
khenaidoo43c82122018-11-22 18:38:28 -0500416 // Use a device specific topic as we are the only core handling requests for this device
417 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
418 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500419 log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
420 return unPackResponse(rpc, deviceId, success, result)
421}
422
khenaidoo19d7b632018-10-30 10:49:50 -0400423func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
424 log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
khenaidoo43c82122018-11-22 18:38:28 -0500425 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo19d7b632018-10-30 10:49:50 -0400426 rpc := "update_flows_bulk"
427 args := make([]*kafka.KVArg, 3)
428 args[0] = &kafka.KVArg{
429 Key: "device",
430 Value: device,
431 }
432 args[1] = &kafka.KVArg{
433 Key: "flows",
434 Value: flows,
435 }
436 args[2] = &kafka.KVArg{
437 Key: "groups",
438 Value: groups,
439 }
440
khenaidoo43c82122018-11-22 18:38:28 -0500441 // Use a device specific topic as we are the only core handling requests for this device
442 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
443 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400444 log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
445 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400446}
447
khenaidoo19d7b632018-10-30 10:49:50 -0400448func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
449 log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
khenaidoo43c82122018-11-22 18:38:28 -0500450 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo19d7b632018-10-30 10:49:50 -0400451 rpc := "update_flows_bulk"
452 args := make([]*kafka.KVArg, 3)
453 args[0] = &kafka.KVArg{
454 Key: "device",
455 Value: device,
456 }
457 args[1] = &kafka.KVArg{
458 Key: "flow_changes",
459 Value: flowChanges,
460 }
461 args[2] = &kafka.KVArg{
462 Key: "group_changes",
463 Value: groupChanges,
464 }
465
khenaidoo43c82122018-11-22 18:38:28 -0500466 // Use a device specific topic as we are the only core handling requests for this device
467 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
468 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400469 log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
470 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400471}
472
473func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {
474 log.Debug("UpdatePmConfig")
475 return nil
476}
477
478func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
479 log.Debug("ReceivePacketOut")
480 return nil
481}
482
483func (ap *AdapterProxy) SuppressAlarm(filter voltha.AlarmFilter) error {
484 log.Debug("SuppressAlarm")
485 return nil
486}
487
488func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
489 log.Debug("UnSuppressAlarm")
490 return nil
khenaidoo89b0e942018-10-21 21:11:33 -0400491}