blob: 4af4fb0aa8fccb24592944d8237ab75e7246840f [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidoo92e62c52018-10-03 14:02:54 -040021 a "github.com/golang/protobuf/ptypes/any"
khenaidoob9203542018-09-17 22:56:37 -040022 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
24 ca "github.com/opencord/voltha-go/protos/core_adapter"
khenaidoo19d7b632018-10-30 10:49:50 -040025 "github.com/opencord/voltha-go/protos/openflow_13"
khenaidoob9203542018-09-17 22:56:37 -040026 "github.com/opencord/voltha-go/protos/voltha"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29)
30
31type AdapterProxy struct {
khenaidoo43c82122018-11-22 18:38:28 -050032 TestMode bool
33 kafkaICProxy *kafka.InterContainerProxy
khenaidoob9203542018-09-17 22:56:37 -040034}
35
khenaidoo43c82122018-11-22 18:38:28 -050036func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
khenaidoob9203542018-09-17 22:56:37 -040037 var proxy AdapterProxy
khenaidoo43c82122018-11-22 18:38:28 -050038 proxy.kafkaICProxy = kafkaProxy
khenaidoob9203542018-09-17 22:56:37 -040039 return &proxy
40}
41
khenaidoo92e62c52018-10-03 14:02:54 -040042func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
43 if success {
44 return nil
45 } else {
46 unpackResult := &ca.Error{}
47 var err error
48 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
49 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
50 }
51 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
52 // TODO: Need to get the real error code
53 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
54 }
55}
56
khenaidoo43c82122018-11-22 18:38:28 -050057//func kafka.CreateSubTopic(args ...string) kafka.Topic{
58// topic := ""
59// for index , arg := range args {
60// if index == 0 {
61// topic = arg
62// } else {
63// topic = fmt.Sprintf("%s_%s", topic, arg)
64// }
65// }
66// return kafka.Topic{Name:topic}
67//}
68
khenaidoob9203542018-09-17 22:56:37 -040069func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
70 log.Debugw("AdoptDevice", log.Fields{"device": device})
khenaidoo92e62c52018-10-03 14:02:54 -040071 rpc := "adopt_device"
khenaidoob9203542018-09-17 22:56:37 -040072 topic := kafka.Topic{Name: device.Type}
73 args := make([]*kafka.KVArg, 1)
74 args[0] = &kafka.KVArg{
75 Key: "device",
76 Value: device,
77 }
khenaidoo43c82122018-11-22 18:38:28 -050078 // Use a device topic for the response as we are the only core handling requests for this device
79 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
80 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
81 log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
82 if success {
83 // From now on, any unsolicited requests from the adapters for this device will come over the device topic.
84 // We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
85 if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
86 log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
87 return err
88 }
89 }
khenaidoo92e62c52018-10-03 14:02:54 -040090 return unPackResponse(rpc, device.Id, success, result)
91}
92
93func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
94 log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
95 rpc := "disable_device"
khenaidoo43c82122018-11-22 18:38:28 -050096 // Use a device specific topic to send the request. The adapter handling the device creates a device
97 // specific topic
98 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo92e62c52018-10-03 14:02:54 -040099 args := make([]*kafka.KVArg, 1)
100 args[0] = &kafka.KVArg{
101 Key: "device",
102 Value: device,
khenaidoob9203542018-09-17 22:56:37 -0400103 }
khenaidoo43c82122018-11-22 18:38:28 -0500104 // Use a device specific topic as we are the only core handling requests for this device
105 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
106 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo92e62c52018-10-03 14:02:54 -0400107 log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
108 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400109}
110
khenaidoo4d4802d2018-10-04 21:59:49 -0400111func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
112 log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
113 rpc := "reenable_device"
khenaidoo43c82122018-11-22 18:38:28 -0500114 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400115 args := make([]*kafka.KVArg, 1)
116 args[0] = &kafka.KVArg{
117 Key: "device",
118 Value: device,
119 }
khenaidoo43c82122018-11-22 18:38:28 -0500120 // Use a device specific topic as we are the only core handling requests for this device
121 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
122 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400123 log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
124 return unPackResponse(rpc, device.Id, success, result)
125}
126
127func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
128 log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
129 rpc := "reboot_device"
khenaidoo43c82122018-11-22 18:38:28 -0500130 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400131 args := make([]*kafka.KVArg, 1)
132 args[0] = &kafka.KVArg{
133 Key: "device",
134 Value: device,
135 }
khenaidoo43c82122018-11-22 18:38:28 -0500136 // Use a device specific topic as we are the only core handling requests for this device
137 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
138 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400139 log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
140 return unPackResponse(rpc, device.Id, success, result)
141}
142
143func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
144 log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
145 rpc := "delete_device"
khenaidoo43c82122018-11-22 18:38:28 -0500146 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400147 args := make([]*kafka.KVArg, 1)
148 args[0] = &kafka.KVArg{
149 Key: "device",
150 Value: device,
151 }
khenaidoo43c82122018-11-22 18:38:28 -0500152 // Use a device specific topic as we are the only core handling requests for this device
153 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
154 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400155 log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
khenaidoo43c82122018-11-22 18:38:28 -0500156
157 // We no longer need to have a target against that topic as we won't receive any unsolicited messages on that
158 // topic
159 if err := ap.kafkaICProxy.UnSubscribeFromRequestHandler(replyToTopic); err != nil {
160 log.Errorw("Unable-to-subscribe-from-target", log.Fields{"topic": replyToTopic, "error": err})
161 return err
162 }
163 // Now delete the topic altogether
164 ap.kafkaICProxy.DeleteTopic(replyToTopic)
165
khenaidoo4d4802d2018-10-04 21:59:49 -0400166 return unPackResponse(rpc, device.Id, success, result)
167}
168
169func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
170 log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
khenaidoo43c82122018-11-22 18:38:28 -0500171 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400172 args := make([]*kafka.KVArg, 1)
173 args[0] = &kafka.KVArg{
174 Key: "device",
175 Value: device,
176 }
khenaidoo43c82122018-11-22 18:38:28 -0500177 // Use a device specific topic as we are the only core handling requests for this device
178 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
179 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400180 log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
181 if success {
182 unpackResult := &ca.SwitchCapability{}
183 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
184 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
185 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
186 }
187 return unpackResult, nil
188 } else {
189 unpackResult := &ca.Error{}
190 var err error
191 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
192 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
193 }
194 log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
195 // TODO: Need to get the real error code
196 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
197 }
198}
199
200func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
201 log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
khenaidoo43c82122018-11-22 18:38:28 -0500202 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo4d4802d2018-10-04 21:59:49 -0400203 args := make([]*kafka.KVArg, 2)
204 args[0] = &kafka.KVArg{
205 Key: "device",
206 Value: device,
207 }
208 pNo := &ca.IntType{Val: int64(portNo)}
209 args[1] = &kafka.KVArg{
210 Key: "port_no",
211 Value: pNo,
212 }
khenaidoo43c82122018-11-22 18:38:28 -0500213 // Use a device specific topic as we are the only core handling requests for this device
214 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
215 success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
khenaidoo4d4802d2018-10-04 21:59:49 -0400216 log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
217 if success {
218 unpackResult := &ca.PortCapability{}
219 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
220 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
221 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
222 }
223 return unpackResult, nil
224 } else {
225 unpackResult := &ca.Error{}
226 var err error
227 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
228 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
229 }
230 log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
231 // TODO: Need to get the real error code
232 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
233 }
234}
235
236//TODO: Implement the functions below
237
khenaidoob9203542018-09-17 22:56:37 -0400238func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
239 log.Debug("AdapterDescriptor")
240 return nil, nil
241}
242
243func (ap *AdapterProxy) DeviceTypes() (*voltha.DeviceType, error) {
244 log.Debug("DeviceTypes")
245 return nil, nil
246}
247
248func (ap *AdapterProxy) Health() (*voltha.HealthStatus, error) {
249 log.Debug("Health")
250 return nil, nil
251}
252
khenaidoo92e62c52018-10-03 14:02:54 -0400253func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
khenaidoob9203542018-09-17 22:56:37 -0400254 log.Debug("ReconcileDevice")
255 return nil
256}
257
258func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
259 log.Debug("AbandonDevice")
260 return nil
261}
262
khenaidoob9203542018-09-17 22:56:37 -0400263func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
264 log.Debug("GetDeviceDetails")
265 return nil, nil
266}
267
268func (ap *AdapterProxy) DownloadImage(device voltha.Device, download voltha.ImageDownload) error {
269 log.Debug("DownloadImage")
270 return nil
271}
272
273func (ap *AdapterProxy) GetImageDownloadStatus(device voltha.Device, download voltha.ImageDownload) error {
274 log.Debug("GetImageDownloadStatus")
275 return nil
276}
277
278func (ap *AdapterProxy) CancelImageDownload(device voltha.Device, download voltha.ImageDownload) error {
279 log.Debug("CancelImageDownload")
280 return nil
281}
282
283func (ap *AdapterProxy) ActivateImageUpdate(device voltha.Device, download voltha.ImageDownload) error {
284 log.Debug("ActivateImageUpdate")
285 return nil
286}
287
288func (ap *AdapterProxy) RevertImageUpdate(device voltha.Device, download voltha.ImageDownload) error {
289 log.Debug("RevertImageUpdate")
290 return nil
291}
292
293func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
294 log.Debug("SelfTestDevice")
295 return nil, nil
296}
297
khenaidoofdbad6e2018-11-06 22:26:38 -0500298func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
299 log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
khenaidoo43c82122018-11-22 18:38:28 -0500300 toTopic := kafka.CreateSubTopic(deviceType, deviceId)
khenaidoofdbad6e2018-11-06 22:26:38 -0500301 rpc := "receive_packet_out"
khenaidoo43c82122018-11-22 18:38:28 -0500302 dId := &ca.StrType{Val: deviceId}
khenaidoofdbad6e2018-11-06 22:26:38 -0500303 args := make([]*kafka.KVArg, 3)
304 args[0] = &kafka.KVArg{
305 Key: "deviceId",
306 Value: dId,
307 }
khenaidoo43c82122018-11-22 18:38:28 -0500308 op := &ca.IntType{Val: int64(outPort)}
khenaidoofdbad6e2018-11-06 22:26:38 -0500309 args[1] = &kafka.KVArg{
310 Key: "outPort",
311 Value: op,
312 }
313 args[2] = &kafka.KVArg{
314 Key: "packet",
315 Value: packet,
316 }
317
318 // TODO: Do we need to wait for an ACK on a packet Out?
khenaidoo43c82122018-11-22 18:38:28 -0500319 // Use a device specific topic as we are the only core handling requests for this device
320 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
321 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
khenaidoofdbad6e2018-11-06 22:26:38 -0500322 log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
323 return unPackResponse(rpc, deviceId, success, result)
324}
325
khenaidoo19d7b632018-10-30 10:49:50 -0400326func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
327 log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
khenaidoo43c82122018-11-22 18:38:28 -0500328 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo19d7b632018-10-30 10:49:50 -0400329 rpc := "update_flows_bulk"
330 args := make([]*kafka.KVArg, 3)
331 args[0] = &kafka.KVArg{
332 Key: "device",
333 Value: device,
334 }
335 args[1] = &kafka.KVArg{
336 Key: "flows",
337 Value: flows,
338 }
339 args[2] = &kafka.KVArg{
340 Key: "groups",
341 Value: groups,
342 }
343
khenaidoo43c82122018-11-22 18:38:28 -0500344 // Use a device specific topic as we are the only core handling requests for this device
345 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
346 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400347 log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
348 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400349}
350
khenaidoo19d7b632018-10-30 10:49:50 -0400351func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
352 log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
khenaidoo43c82122018-11-22 18:38:28 -0500353 toTopic := kafka.CreateSubTopic(device.Type, device.Id)
khenaidoo19d7b632018-10-30 10:49:50 -0400354 rpc := "update_flows_bulk"
355 args := make([]*kafka.KVArg, 3)
356 args[0] = &kafka.KVArg{
357 Key: "device",
358 Value: device,
359 }
360 args[1] = &kafka.KVArg{
361 Key: "flow_changes",
362 Value: flowChanges,
363 }
364 args[2] = &kafka.KVArg{
365 Key: "group_changes",
366 Value: groupChanges,
367 }
368
khenaidoo43c82122018-11-22 18:38:28 -0500369 // Use a device specific topic as we are the only core handling requests for this device
370 replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
371 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
khenaidoo19d7b632018-10-30 10:49:50 -0400372 log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
373 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400374}
375
376func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {
377 log.Debug("UpdatePmConfig")
378 return nil
379}
380
381func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
382 log.Debug("ReceivePacketOut")
383 return nil
384}
385
386func (ap *AdapterProxy) SuppressAlarm(filter voltha.AlarmFilter) error {
387 log.Debug("SuppressAlarm")
388 return nil
389}
390
391func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
392 log.Debug("UnSuppressAlarm")
393 return nil
khenaidoo89b0e942018-10-21 21:11:33 -0400394}