blob: 9ddda854809460e059c0b9784f4086d6c2764d42 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidoo92e62c52018-10-03 14:02:54 -040021 a "github.com/golang/protobuf/ptypes/any"
khenaidoob9203542018-09-17 22:56:37 -040022 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
khenaidoo79232702018-12-04 11:00:41 -050024 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidoo19d7b632018-10-30 10:49:50 -040025 "github.com/opencord/voltha-go/protos/openflow_13"
khenaidoob9203542018-09-17 22:56:37 -040026 "github.com/opencord/voltha-go/protos/voltha"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29)
30
31type AdapterProxy struct {
khenaidoo43c82122018-11-22 18:38:28 -050032 TestMode bool
33 kafkaICProxy *kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040034}
35
khenaidoo43c82122018-11-22 18:38:28 -050036func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
khenaidoob9203542018-09-17 22:56:37 -040037 var proxy AdapterProxy
khenaidoo43c82122018-11-22 18:38:28 -050038 proxy.kafkaICProxy = kafkaProxy
khenaidoob9203542018-09-17 22:56:37 -040039 return &proxy
40}
41
khenaidoo92e62c52018-10-03 14:02:54 -040042func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
43 if success {
44 return nil
45 } else {
khenaidoo79232702018-12-04 11:00:41 -050046 unpackResult := &ic.Error{}
khenaidoo92e62c52018-10-03 14:02:54 -040047 var err error
48 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
49 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
50 }
51 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
52 // TODO: Need to get the real error code
53 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
54 }
55}
56
khenaidoob9203542018-09-17 22:56:37 -040057func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
58 log.Debugw("AdoptDevice", log.Fields{"device": device})
khenaidoo92e62c52018-10-03 14:02:54 -040059 rpc := "adopt_device"
khenaidoo21d51152019-02-01 13:48:37 -050060 topic := kafka.Topic{Name: device.Adapter}
khenaidoob9203542018-09-17 22:56:37 -040061 args := make([]*kafka.KVArg, 1)
62 args[0] = &kafka.KVArg{
63 Key: "device",
64 Value: device,
65 }
khenaidoo43c82122018-11-22 18:38:28 -050066 // Use a device topic for the response as we are the only core handling requests for this device
67 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
khenaidoo731697e2019-01-29 16:03:29 -050068 if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
khenaidoo90847922018-12-03 14:47:51 -050069 log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
70 return err
71 }
khenaidoo43c82122018-11-22 18:38:28 -050072 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
73 log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
khenaidoo92e62c52018-10-03 14:02:54 -040074 return unPackResponse(rpc, device.Id, success, result)
75}
76
77func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
78 log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
79 rpc := "disable_device"
khenaidoo43c82122018-11-22 18:38:28 -050080 // Use a device specific topic to send the request. The adapter handling the device creates a device
81 // specific topic
khenaidoo21d51152019-02-01 13:48:37 -050082 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo92e62c52018-10-03 14:02:54 -040083 args := make([]*kafka.KVArg, 1)
84 args[0] = &kafka.KVArg{
85 Key: "device",
86 Value: device,
khenaidoob9203542018-09-17 22:56:37 -040087 }
khenaidoo43c82122018-11-22 18:38:28 -050088 // Use a device specific topic as we are the only core handling requests for this device
89 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
90 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo92e62c52018-10-03 14:02:54 -040091 log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
92 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -040093}
94
khenaidoo4d4802d2018-10-04 21:59:49 -040095func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
96 log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
97 rpc := "reenable_device"
khenaidoo21d51152019-02-01 13:48:37 -050098 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -040099 args := make([]*kafka.KVArg, 1)
100 args[0] = &kafka.KVArg{
101 Key: "device",
102 Value: device,
103 }
khenaidoo43c82122018-11-22 18:38:28 -0500104 // Use a device specific topic as we are the only core handling requests for this device
105 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
106 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400107 log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
108 return unPackResponse(rpc, device.Id, success, result)
109}
110
111func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
112 log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
113 rpc := "reboot_device"
khenaidoo21d51152019-02-01 13:48:37 -0500114 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400115 args := make([]*kafka.KVArg, 1)
116 args[0] = &kafka.KVArg{
117 Key: "device",
118 Value: device,
119 }
khenaidoo43c82122018-11-22 18:38:28 -0500120 // Use a device specific topic as we are the only core handling requests for this device
121 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
122 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400123 log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
124 return unPackResponse(rpc, device.Id, success, result)
125}
126
127func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
128 log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
129 rpc := "delete_device"
khenaidoo21d51152019-02-01 13:48:37 -0500130 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400131 args := make([]*kafka.KVArg, 1)
132 args[0] = &kafka.KVArg{
133 Key: "device",
134 Value: device,
135 }
khenaidoo43c82122018-11-22 18:38:28 -0500136 // Use a device specific topic as we are the only core handling requests for this device
137 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
138 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400139 log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
khenaidoo43c82122018-11-22 18:38:28 -0500140
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500141 // We no longer need to have this device topic as we won't receive any unsolicited messages on it
142 if err := ap.kafkaICProxy.DeleteTopic(replyToTopic); err != nil {
143 log.Errorw("Unable-to-delete-topic", log.Fields{"topic": replyToTopic, "error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500144 return err
145 }
khenaidoo43c82122018-11-22 18:38:28 -0500146
khenaidoo4d4802d2018-10-04 21:59:49 -0400147 return unPackResponse(rpc, device.Id, success, result)
148}
149
khenaidoo79232702018-12-04 11:00:41 -0500150func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400151 log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
khenaidoo21d51152019-02-01 13:48:37 -0500152 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400153 args := make([]*kafka.KVArg, 1)
154 args[0] = &kafka.KVArg{
155 Key: "device",
156 Value: device,
157 }
khenaidoo43c82122018-11-22 18:38:28 -0500158 // Use a device specific topic as we are the only core handling requests for this device
159 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
160 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400161 log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
162 if success {
khenaidoo79232702018-12-04 11:00:41 -0500163 unpackResult := &ic.SwitchCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400164 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
165 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
166 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
167 }
168 return unpackResult, nil
169 } else {
khenaidoo79232702018-12-04 11:00:41 -0500170 unpackResult := &ic.Error{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400171 var err error
172 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
173 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
174 }
175 log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
176 // TODO: Need to get the real error code
177 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
178 }
179}
180
khenaidoo79232702018-12-04 11:00:41 -0500181func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400182 log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
khenaidoo21d51152019-02-01 13:48:37 -0500183 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400184 args := make([]*kafka.KVArg, 2)
185 args[0] = &kafka.KVArg{
186 Key: "device",
187 Value: device,
188 }
khenaidoo79232702018-12-04 11:00:41 -0500189 pNo := &ic.IntType{Val: int64(portNo)}
khenaidoo4d4802d2018-10-04 21:59:49 -0400190 args[1] = &kafka.KVArg{
191 Key: "port_no",
192 Value: pNo,
193 }
khenaidoo43c82122018-11-22 18:38:28 -0500194 // Use a device specific topic as we are the only core handling requests for this device
195 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
196 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400197 log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
198 if success {
khenaidoo79232702018-12-04 11:00:41 -0500199 unpackResult := &ic.PortCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400200 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
201 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
202 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
203 }
204 return unpackResult, nil
205 } else {
khenaidoo79232702018-12-04 11:00:41 -0500206 unpackResult := &ic.Error{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400207 var err error
208 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
209 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
210 }
211 log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
212 // TODO: Need to get the real error code
213 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
214 }
215}
216
217//TODO: Implement the functions below
218
khenaidoob9203542018-09-17 22:56:37 -0400219func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
220 log.Debug("AdapterDescriptor")
221 return nil, nil
222}
223
224func (ap *AdapterProxy) DeviceTypes() (*voltha.DeviceType, error) {
225 log.Debug("DeviceTypes")
226 return nil, nil
227}
228
229func (ap *AdapterProxy) Health() (*voltha.HealthStatus, error) {
230 log.Debug("Health")
231 return nil, nil
232}
233
khenaidoo92e62c52018-10-03 14:02:54 -0400234func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
khenaidoob9203542018-09-17 22:56:37 -0400235 log.Debug("ReconcileDevice")
236 return nil
237}
238
239func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
240 log.Debug("AbandonDevice")
241 return nil
242}
243
khenaidoob9203542018-09-17 22:56:37 -0400244func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
245 log.Debug("GetDeviceDetails")
246 return nil, nil
247}
248
khenaidoof5a5bfa2019-01-23 22:20:29 -0500249func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
250 log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
251 rpc := "download_image"
khenaidoo21d51152019-02-01 13:48:37 -0500252 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500253 args := make([]*kafka.KVArg, 2)
254 args[0] = &kafka.KVArg{
255 Key: "device",
256 Value: device,
257 }
258 args[1] = &kafka.KVArg{
259 Key: "request",
260 Value: download,
261 }
262 // Use a device specific topic as we are the only core handling requests for this device
263 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
264 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
265 log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
266
267 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400268}
269
khenaidoof5a5bfa2019-01-23 22:20:29 -0500270func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
271 log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
272 rpc := "get_image_download_status"
khenaidoo21d51152019-02-01 13:48:37 -0500273 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500274 args := make([]*kafka.KVArg, 2)
275 args[0] = &kafka.KVArg{
276 Key: "device",
277 Value: device,
278 }
279 args[1] = &kafka.KVArg{
280 Key: "request",
281 Value: download,
282 }
283 // Use a device specific topic as we are the only core handling requests for this device
284 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
285 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
286 log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
287
288 if success {
289 unpackResult := &voltha.ImageDownload{}
290 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
291 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
292 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
293 }
294 return unpackResult, nil
295 } else {
296 unpackResult := &ic.Error{}
297 var err error
298 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
299 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
300 return nil, err
301 }
302 log.Debugw("GetImageDownloadStatus-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
303 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
304 }
khenaidoob9203542018-09-17 22:56:37 -0400305}
306
khenaidoof5a5bfa2019-01-23 22:20:29 -0500307func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
308 log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
309 rpc := "cancel_image_download"
khenaidoo21d51152019-02-01 13:48:37 -0500310 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500311 args := make([]*kafka.KVArg, 2)
312 args[0] = &kafka.KVArg{
313 Key: "device",
314 Value: device,
315 }
316 args[1] = &kafka.KVArg{
317 Key: "request",
318 Value: download,
319 }
320 // Use a device specific topic as we are the only core handling requests for this device
321 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
322 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
323 log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
324
325 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400326}
327
khenaidoof5a5bfa2019-01-23 22:20:29 -0500328func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
329 log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
330 rpc := "activate_image_update"
khenaidoo21d51152019-02-01 13:48:37 -0500331 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500332 args := make([]*kafka.KVArg, 2)
333 args[0] = &kafka.KVArg{
334 Key: "device",
335 Value: device,
336 }
337 args[1] = &kafka.KVArg{
338 Key: "request",
339 Value: download,
340 }
341 // Use a device specific topic as we are the only core handling requests for this device
342 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
343 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
344 log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
345
346 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400347}
348
khenaidoof5a5bfa2019-01-23 22:20:29 -0500349func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
350 log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
351 rpc := "revert_image_update"
khenaidoo21d51152019-02-01 13:48:37 -0500352 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500353 args := make([]*kafka.KVArg, 2)
354 args[0] = &kafka.KVArg{
355 Key: "device",
356 Value: device,
357 }
358 args[1] = &kafka.KVArg{
359 Key: "request",
360 Value: download,
361 }
362 // Use a device specific topic as we are the only core handling requests for this device
363 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
364 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
365 log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
366
367 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400368}
369
370func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
371 log.Debug("SelfTestDevice")
372 return nil, nil
373}
374
khenaidoofdbad6e2018-11-06 22:26:38 -0500375func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
376 log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
khenaidoo43c82122018-11-22 18:38:28 -0500377 toTopic := kafka.CreateSubTopic(deviceType, deviceId)
khenaidoofdbad6e2018-11-06 22:26:38 -0500378 rpc := "receive_packet_out"
khenaidoo79232702018-12-04 11:00:41 -0500379 dId := &ic.StrType{Val: deviceId}
khenaidoofdbad6e2018-11-06 22:26:38 -0500380 args := make([]*kafka.KVArg, 3)
381 args[0] = &kafka.KVArg{
382 Key: "deviceId",
383 Value: dId,
384 }
khenaidoo79232702018-12-04 11:00:41 -0500385 op := &ic.IntType{Val: int64(outPort)}
khenaidoofdbad6e2018-11-06 22:26:38 -0500386 args[1] = &kafka.KVArg{
387 Key: "outPort",
388 Value: op,
389 }
390 args[2] = &kafka.KVArg{
391 Key: "packet",
392 Value: packet,
393 }
394
395 // TODO: Do we need to wait for an ACK on a packet Out?
khenaidoo43c82122018-11-22 18:38:28 -0500396 // Use a device specific topic as we are the only core handling requests for this device
397 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
398 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500399 log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
400 return unPackResponse(rpc, deviceId, success, result)
401}
402
khenaidoo19d7b632018-10-30 10:49:50 -0400403func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
404 log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
khenaidoo21d51152019-02-01 13:48:37 -0500405 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo19d7b632018-10-30 10:49:50 -0400406 rpc := "update_flows_bulk"
407 args := make([]*kafka.KVArg, 3)
408 args[0] = &kafka.KVArg{
409 Key: "device",
410 Value: device,
411 }
412 args[1] = &kafka.KVArg{
413 Key: "flows",
414 Value: flows,
415 }
416 args[2] = &kafka.KVArg{
417 Key: "groups",
418 Value: groups,
419 }
420
khenaidoo43c82122018-11-22 18:38:28 -0500421 // Use a device specific topic as we are the only core handling requests for this device
422 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
423 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400424 log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
425 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400426}
427
khenaidoo19d7b632018-10-30 10:49:50 -0400428func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
429 log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
khenaidoo21d51152019-02-01 13:48:37 -0500430 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo19d7b632018-10-30 10:49:50 -0400431 rpc := "update_flows_bulk"
432 args := make([]*kafka.KVArg, 3)
433 args[0] = &kafka.KVArg{
434 Key: "device",
435 Value: device,
436 }
437 args[1] = &kafka.KVArg{
438 Key: "flow_changes",
439 Value: flowChanges,
440 }
441 args[2] = &kafka.KVArg{
442 Key: "group_changes",
443 Value: groupChanges,
444 }
445
khenaidoo43c82122018-11-22 18:38:28 -0500446 // Use a device specific topic as we are the only core handling requests for this device
447 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
448 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400449 log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
450 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400451}
452
453func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {
454 log.Debug("UpdatePmConfig")
455 return nil
456}
457
458func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
459 log.Debug("ReceivePacketOut")
460 return nil
461}
462
463func (ap *AdapterProxy) SuppressAlarm(filter voltha.AlarmFilter) error {
464 log.Debug("SuppressAlarm")
465 return nil
466}
467
468func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
469 log.Debug("UnSuppressAlarm")
470 return nil
khenaidoo89b0e942018-10-21 21:11:33 -0400471}