blob: ed09c9d20235925a6dcc544b212f23499af20ffd [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidoo92e62c52018-10-03 14:02:54 -040021 a "github.com/golang/protobuf/ptypes/any"
khenaidoob9203542018-09-17 22:56:37 -040022 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
khenaidoo79232702018-12-04 11:00:41 -050024 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidoo19d7b632018-10-30 10:49:50 -040025 "github.com/opencord/voltha-go/protos/openflow_13"
khenaidoob9203542018-09-17 22:56:37 -040026 "github.com/opencord/voltha-go/protos/voltha"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29)
30
31type AdapterProxy struct {
khenaidoo43c82122018-11-22 18:38:28 -050032 TestMode bool
33 kafkaICProxy *kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040034}
35
khenaidoo43c82122018-11-22 18:38:28 -050036func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
khenaidoob9203542018-09-17 22:56:37 -040037 var proxy AdapterProxy
khenaidoo43c82122018-11-22 18:38:28 -050038 proxy.kafkaICProxy = kafkaProxy
khenaidoob9203542018-09-17 22:56:37 -040039 return &proxy
40}
41
khenaidoo92e62c52018-10-03 14:02:54 -040042func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
43 if success {
44 return nil
45 } else {
khenaidoo79232702018-12-04 11:00:41 -050046 unpackResult := &ic.Error{}
khenaidoo92e62c52018-10-03 14:02:54 -040047 var err error
48 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
49 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidoo6d055132019-02-12 16:51:19 -050050 return err
khenaidoo92e62c52018-10-03 14:02:54 -040051 }
52 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
53 // TODO: Need to get the real error code
54 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
55 }
56}
57
khenaidoob9203542018-09-17 22:56:37 -040058func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
59 log.Debugw("AdoptDevice", log.Fields{"device": device})
khenaidoo92e62c52018-10-03 14:02:54 -040060 rpc := "adopt_device"
khenaidoo21d51152019-02-01 13:48:37 -050061 topic := kafka.Topic{Name: device.Adapter}
khenaidoob9203542018-09-17 22:56:37 -040062 args := make([]*kafka.KVArg, 1)
63 args[0] = &kafka.KVArg{
64 Key: "device",
65 Value: device,
66 }
khenaidoo43c82122018-11-22 18:38:28 -050067 // Use a device topic for the response as we are the only core handling requests for this device
68 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
khenaidoo731697e2019-01-29 16:03:29 -050069 if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
khenaidoo90847922018-12-03 14:47:51 -050070 log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
71 return err
72 }
khenaidoo43c82122018-11-22 18:38:28 -050073 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
74 log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
khenaidoo92e62c52018-10-03 14:02:54 -040075 return unPackResponse(rpc, device.Id, success, result)
76}
77
78func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
79 log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
80 rpc := "disable_device"
khenaidoo43c82122018-11-22 18:38:28 -050081 // Use a device specific topic to send the request. The adapter handling the device creates a device
82 // specific topic
khenaidoo21d51152019-02-01 13:48:37 -050083 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo92e62c52018-10-03 14:02:54 -040084 args := make([]*kafka.KVArg, 1)
85 args[0] = &kafka.KVArg{
86 Key: "device",
87 Value: device,
khenaidoob9203542018-09-17 22:56:37 -040088 }
khenaidoo43c82122018-11-22 18:38:28 -050089 // Use a device specific topic as we are the only core handling requests for this device
90 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
91 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo92e62c52018-10-03 14:02:54 -040092 log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
93 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -040094}
95
khenaidoo4d4802d2018-10-04 21:59:49 -040096func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
97 log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
98 rpc := "reenable_device"
khenaidoo21d51152019-02-01 13:48:37 -050099 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400100 args := make([]*kafka.KVArg, 1)
101 args[0] = &kafka.KVArg{
102 Key: "device",
103 Value: device,
104 }
khenaidoo43c82122018-11-22 18:38:28 -0500105 // Use a device specific topic as we are the only core handling requests for this device
106 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
107 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400108 log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
109 return unPackResponse(rpc, device.Id, success, result)
110}
111
112func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
113 log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
114 rpc := "reboot_device"
khenaidoo21d51152019-02-01 13:48:37 -0500115 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400116 args := make([]*kafka.KVArg, 1)
117 args[0] = &kafka.KVArg{
118 Key: "device",
119 Value: device,
120 }
khenaidoo43c82122018-11-22 18:38:28 -0500121 // Use a device specific topic as we are the only core handling requests for this device
122 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
123 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400124 log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
125 return unPackResponse(rpc, device.Id, success, result)
126}
127
128func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
129 log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
130 rpc := "delete_device"
khenaidoo21d51152019-02-01 13:48:37 -0500131 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400132 args := make([]*kafka.KVArg, 1)
133 args[0] = &kafka.KVArg{
134 Key: "device",
135 Value: device,
136 }
khenaidoo43c82122018-11-22 18:38:28 -0500137 // Use a device specific topic as we are the only core handling requests for this device
138 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
139 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400140 log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
khenaidoo43c82122018-11-22 18:38:28 -0500141
khenaidoo3dfc8bc2019-01-10 16:48:25 -0500142 // We no longer need to have this device topic as we won't receive any unsolicited messages on it
143 if err := ap.kafkaICProxy.DeleteTopic(replyToTopic); err != nil {
144 log.Errorw("Unable-to-delete-topic", log.Fields{"topic": replyToTopic, "error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500145 return err
146 }
khenaidoo43c82122018-11-22 18:38:28 -0500147
khenaidoo4d4802d2018-10-04 21:59:49 -0400148 return unPackResponse(rpc, device.Id, success, result)
149}
150
khenaidoo79232702018-12-04 11:00:41 -0500151func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400152 log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
khenaidoo21d51152019-02-01 13:48:37 -0500153 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400154 args := make([]*kafka.KVArg, 1)
155 args[0] = &kafka.KVArg{
156 Key: "device",
157 Value: device,
158 }
khenaidoo43c82122018-11-22 18:38:28 -0500159 // Use a device specific topic as we are the only core handling requests for this device
160 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
161 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400162 log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
163 if success {
khenaidoo79232702018-12-04 11:00:41 -0500164 unpackResult := &ic.SwitchCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400165 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
166 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
167 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
168 }
169 return unpackResult, nil
170 } else {
khenaidoo79232702018-12-04 11:00:41 -0500171 unpackResult := &ic.Error{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400172 var err error
173 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
174 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
175 }
176 log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
177 // TODO: Need to get the real error code
178 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
179 }
180}
181
khenaidoo79232702018-12-04 11:00:41 -0500182func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400183 log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
khenaidoo21d51152019-02-01 13:48:37 -0500184 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400185 args := make([]*kafka.KVArg, 2)
186 args[0] = &kafka.KVArg{
187 Key: "device",
188 Value: device,
189 }
khenaidoo79232702018-12-04 11:00:41 -0500190 pNo := &ic.IntType{Val: int64(portNo)}
khenaidoo4d4802d2018-10-04 21:59:49 -0400191 args[1] = &kafka.KVArg{
192 Key: "port_no",
193 Value: pNo,
194 }
khenaidoo43c82122018-11-22 18:38:28 -0500195 // Use a device specific topic as we are the only core handling requests for this device
196 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
197 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400198 log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
199 if success {
khenaidoo79232702018-12-04 11:00:41 -0500200 unpackResult := &ic.PortCapability{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400201 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
202 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
203 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
204 }
205 return unpackResult, nil
206 } else {
khenaidoo79232702018-12-04 11:00:41 -0500207 unpackResult := &ic.Error{}
khenaidoo4d4802d2018-10-04 21:59:49 -0400208 var err error
209 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
210 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
211 }
212 log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
213 // TODO: Need to get the real error code
214 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
215 }
216}
217
218//TODO: Implement the functions below
219
khenaidoob9203542018-09-17 22:56:37 -0400220func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
221 log.Debug("AdapterDescriptor")
222 return nil, nil
223}
224
225func (ap *AdapterProxy) DeviceTypes() (*voltha.DeviceType, error) {
226 log.Debug("DeviceTypes")
227 return nil, nil
228}
229
230func (ap *AdapterProxy) Health() (*voltha.HealthStatus, error) {
231 log.Debug("Health")
232 return nil, nil
233}
234
khenaidoo92e62c52018-10-03 14:02:54 -0400235func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
khenaidoob9203542018-09-17 22:56:37 -0400236 log.Debug("ReconcileDevice")
237 return nil
238}
239
240func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
241 log.Debug("AbandonDevice")
242 return nil
243}
244
khenaidoob9203542018-09-17 22:56:37 -0400245func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
246 log.Debug("GetDeviceDetails")
247 return nil, nil
248}
249
khenaidoof5a5bfa2019-01-23 22:20:29 -0500250func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
251 log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
252 rpc := "download_image"
khenaidoo21d51152019-02-01 13:48:37 -0500253 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500254 args := make([]*kafka.KVArg, 2)
255 args[0] = &kafka.KVArg{
256 Key: "device",
257 Value: device,
258 }
259 args[1] = &kafka.KVArg{
260 Key: "request",
261 Value: download,
262 }
263 // Use a device specific topic as we are the only core handling requests for this device
264 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
265 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
266 log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
267
268 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400269}
270
khenaidoof5a5bfa2019-01-23 22:20:29 -0500271func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
272 log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
273 rpc := "get_image_download_status"
khenaidoo21d51152019-02-01 13:48:37 -0500274 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500275 args := make([]*kafka.KVArg, 2)
276 args[0] = &kafka.KVArg{
277 Key: "device",
278 Value: device,
279 }
280 args[1] = &kafka.KVArg{
281 Key: "request",
282 Value: download,
283 }
284 // Use a device specific topic as we are the only core handling requests for this device
285 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
286 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
287 log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
288
289 if success {
290 unpackResult := &voltha.ImageDownload{}
291 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
292 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
293 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
294 }
295 return unpackResult, nil
296 } else {
297 unpackResult := &ic.Error{}
298 var err error
299 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
300 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
301 return nil, err
302 }
303 log.Debugw("GetImageDownloadStatus-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
304 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
305 }
khenaidoob9203542018-09-17 22:56:37 -0400306}
307
khenaidoof5a5bfa2019-01-23 22:20:29 -0500308func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
309 log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
310 rpc := "cancel_image_download"
khenaidoo21d51152019-02-01 13:48:37 -0500311 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500312 args := make([]*kafka.KVArg, 2)
313 args[0] = &kafka.KVArg{
314 Key: "device",
315 Value: device,
316 }
317 args[1] = &kafka.KVArg{
318 Key: "request",
319 Value: download,
320 }
321 // Use a device specific topic as we are the only core handling requests for this device
322 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
323 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
324 log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
325
326 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400327}
328
khenaidoof5a5bfa2019-01-23 22:20:29 -0500329func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
330 log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
331 rpc := "activate_image_update"
khenaidoo21d51152019-02-01 13:48:37 -0500332 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500333 args := make([]*kafka.KVArg, 2)
334 args[0] = &kafka.KVArg{
335 Key: "device",
336 Value: device,
337 }
338 args[1] = &kafka.KVArg{
339 Key: "request",
340 Value: download,
341 }
342 // Use a device specific topic as we are the only core handling requests for this device
343 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
344 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
345 log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
346
347 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400348}
349
khenaidoof5a5bfa2019-01-23 22:20:29 -0500350func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
351 log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
352 rpc := "revert_image_update"
khenaidoo21d51152019-02-01 13:48:37 -0500353 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoof5a5bfa2019-01-23 22:20:29 -0500354 args := make([]*kafka.KVArg, 2)
355 args[0] = &kafka.KVArg{
356 Key: "device",
357 Value: device,
358 }
359 args[1] = &kafka.KVArg{
360 Key: "request",
361 Value: download,
362 }
363 // Use a device specific topic as we are the only core handling requests for this device
364 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
365 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
366 log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
367
368 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400369}
370
371func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
372 log.Debug("SelfTestDevice")
373 return nil, nil
374}
375
khenaidoofdbad6e2018-11-06 22:26:38 -0500376func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
377 log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
khenaidoo43c82122018-11-22 18:38:28 -0500378 toTopic := kafka.CreateSubTopic(deviceType, deviceId)
khenaidoofdbad6e2018-11-06 22:26:38 -0500379 rpc := "receive_packet_out"
khenaidoo79232702018-12-04 11:00:41 -0500380 dId := &ic.StrType{Val: deviceId}
khenaidoofdbad6e2018-11-06 22:26:38 -0500381 args := make([]*kafka.KVArg, 3)
382 args[0] = &kafka.KVArg{
383 Key: "deviceId",
384 Value: dId,
385 }
khenaidoo79232702018-12-04 11:00:41 -0500386 op := &ic.IntType{Val: int64(outPort)}
khenaidoofdbad6e2018-11-06 22:26:38 -0500387 args[1] = &kafka.KVArg{
388 Key: "outPort",
389 Value: op,
390 }
391 args[2] = &kafka.KVArg{
392 Key: "packet",
393 Value: packet,
394 }
395
396 // TODO: Do we need to wait for an ACK on a packet Out?
khenaidoo43c82122018-11-22 18:38:28 -0500397 // Use a device specific topic as we are the only core handling requests for this device
398 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
399 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500400 log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
401 return unPackResponse(rpc, deviceId, success, result)
402}
403
khenaidoo19d7b632018-10-30 10:49:50 -0400404func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
405 log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
khenaidoo21d51152019-02-01 13:48:37 -0500406 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo19d7b632018-10-30 10:49:50 -0400407 rpc := "update_flows_bulk"
408 args := make([]*kafka.KVArg, 3)
409 args[0] = &kafka.KVArg{
410 Key: "device",
411 Value: device,
412 }
413 args[1] = &kafka.KVArg{
414 Key: "flows",
415 Value: flows,
416 }
417 args[2] = &kafka.KVArg{
418 Key: "groups",
419 Value: groups,
420 }
421
khenaidoo43c82122018-11-22 18:38:28 -0500422 // Use a device specific topic as we are the only core handling requests for this device
423 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
424 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400425 log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
426 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400427}
428
khenaidoo19d7b632018-10-30 10:49:50 -0400429func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
430 log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
khenaidoo21d51152019-02-01 13:48:37 -0500431 toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
khenaidoo19d7b632018-10-30 10:49:50 -0400432 rpc := "update_flows_bulk"
433 args := make([]*kafka.KVArg, 3)
434 args[0] = &kafka.KVArg{
435 Key: "device",
436 Value: device,
437 }
438 args[1] = &kafka.KVArg{
439 Key: "flow_changes",
440 Value: flowChanges,
441 }
442 args[2] = &kafka.KVArg{
443 Key: "group_changes",
444 Value: groupChanges,
445 }
446
khenaidoo43c82122018-11-22 18:38:28 -0500447 // Use a device specific topic as we are the only core handling requests for this device
448 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
449 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400450 log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
451 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400452}
453
454func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {
455 log.Debug("UpdatePmConfig")
456 return nil
457}
458
459func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
460 log.Debug("ReceivePacketOut")
461 return nil
462}
463
464func (ap *AdapterProxy) SuppressAlarm(filter voltha.AlarmFilter) error {
465 log.Debug("SuppressAlarm")
466 return nil
467}
468
469func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
470 log.Debug("UnSuppressAlarm")
471 return nil
khenaidoo89b0e942018-10-21 21:11:33 -0400472}