blob: ab35037b4fd303e0f9c6a8d2dfd9c621ada1001e [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
khenaidoo92e62c52018-10-03 14:02:54 -040021 a "github.com/golang/protobuf/ptypes/any"
khenaidoob9203542018-09-17 22:56:37 -040022 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
24 ca "github.com/opencord/voltha-go/protos/core_adapter"
khenaidoo19d7b632018-10-30 10:49:50 -040025 "github.com/opencord/voltha-go/protos/openflow_13"
khenaidoob9203542018-09-17 22:56:37 -040026 "github.com/opencord/voltha-go/protos/voltha"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29)
30
31type AdapterProxy struct {
32 TestMode bool
33 kafkaProxy *kafka.KafkaMessagingProxy
34}
35
36func NewAdapterProxy(kafkaProxy *kafka.KafkaMessagingProxy) *AdapterProxy {
37 var proxy AdapterProxy
38 proxy.kafkaProxy = kafkaProxy
39 return &proxy
40}
41
khenaidoo92e62c52018-10-03 14:02:54 -040042func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
43 if success {
44 return nil
45 } else {
46 unpackResult := &ca.Error{}
47 var err error
48 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
49 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
50 }
51 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
52 // TODO: Need to get the real error code
53 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
54 }
55}
56
khenaidoob9203542018-09-17 22:56:37 -040057func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
58 log.Debugw("AdoptDevice", log.Fields{"device": device})
khenaidoo92e62c52018-10-03 14:02:54 -040059 rpc := "adopt_device"
khenaidoob9203542018-09-17 22:56:37 -040060 topic := kafka.Topic{Name: device.Type}
61 args := make([]*kafka.KVArg, 1)
62 args[0] = &kafka.KVArg{
63 Key: "device",
64 Value: device,
65 }
khenaidoo92e62c52018-10-03 14:02:54 -040066 success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
67 log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success})
68 return unPackResponse(rpc, device.Id, success, result)
69}
70
71func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
72 log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
73 rpc := "disable_device"
74 topic := kafka.Topic{Name: device.Type}
75 args := make([]*kafka.KVArg, 1)
76 args[0] = &kafka.KVArg{
77 Key: "device",
78 Value: device,
khenaidoob9203542018-09-17 22:56:37 -040079 }
khenaidoo92e62c52018-10-03 14:02:54 -040080 success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
81 log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
82 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -040083}
84
khenaidoo4d4802d2018-10-04 21:59:49 -040085func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
86 log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
87 rpc := "reenable_device"
88 topic := kafka.Topic{Name: device.Type}
89 args := make([]*kafka.KVArg, 1)
90 args[0] = &kafka.KVArg{
91 Key: "device",
92 Value: device,
93 }
94 success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
95 log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
96 return unPackResponse(rpc, device.Id, success, result)
97}
98
99func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
100 log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
101 rpc := "reboot_device"
102 topic := kafka.Topic{Name: device.Type}
103 args := make([]*kafka.KVArg, 1)
104 args[0] = &kafka.KVArg{
105 Key: "device",
106 Value: device,
107 }
108 success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
109 log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
110 return unPackResponse(rpc, device.Id, success, result)
111}
112
113func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
114 log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
115 rpc := "delete_device"
116 topic := kafka.Topic{Name: device.Type}
117 args := make([]*kafka.KVArg, 1)
118 args[0] = &kafka.KVArg{
119 Key: "device",
120 Value: device,
121 }
122 success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
123 log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
124 return unPackResponse(rpc, device.Id, success, result)
125}
126
127func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
128 log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
129 topic := kafka.Topic{Name: device.Type}
130 args := make([]*kafka.KVArg, 1)
131 args[0] = &kafka.KVArg{
132 Key: "device",
133 Value: device,
134 }
135 success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
136 log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
137 if success {
138 unpackResult := &ca.SwitchCapability{}
139 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
140 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
141 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
142 }
143 return unpackResult, nil
144 } else {
145 unpackResult := &ca.Error{}
146 var err error
147 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
148 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
149 }
150 log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
151 // TODO: Need to get the real error code
152 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
153 }
154}
155
156func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
157 log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
158 topic := kafka.Topic{Name: device.Type}
159 args := make([]*kafka.KVArg, 2)
160 args[0] = &kafka.KVArg{
161 Key: "device",
162 Value: device,
163 }
164 pNo := &ca.IntType{Val: int64(portNo)}
165 args[1] = &kafka.KVArg{
166 Key: "port_no",
167 Value: pNo,
168 }
169
170 success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
171 log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
172 if success {
173 unpackResult := &ca.PortCapability{}
174 if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
175 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
176 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
177 }
178 return unpackResult, nil
179 } else {
180 unpackResult := &ca.Error{}
181 var err error
182 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
183 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
184 }
185 log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
186 // TODO: Need to get the real error code
187 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
188 }
189}
190
191//TODO: Implement the functions below
192
khenaidoob9203542018-09-17 22:56:37 -0400193func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
194 log.Debug("AdapterDescriptor")
195 return nil, nil
196}
197
198func (ap *AdapterProxy) DeviceTypes() (*voltha.DeviceType, error) {
199 log.Debug("DeviceTypes")
200 return nil, nil
201}
202
203func (ap *AdapterProxy) Health() (*voltha.HealthStatus, error) {
204 log.Debug("Health")
205 return nil, nil
206}
207
khenaidoo92e62c52018-10-03 14:02:54 -0400208func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
khenaidoob9203542018-09-17 22:56:37 -0400209 log.Debug("ReconcileDevice")
210 return nil
211}
212
213func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
214 log.Debug("AbandonDevice")
215 return nil
216}
217
khenaidoob9203542018-09-17 22:56:37 -0400218func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
219 log.Debug("GetDeviceDetails")
220 return nil, nil
221}
222
223func (ap *AdapterProxy) DownloadImage(device voltha.Device, download voltha.ImageDownload) error {
224 log.Debug("DownloadImage")
225 return nil
226}
227
228func (ap *AdapterProxy) GetImageDownloadStatus(device voltha.Device, download voltha.ImageDownload) error {
229 log.Debug("GetImageDownloadStatus")
230 return nil
231}
232
233func (ap *AdapterProxy) CancelImageDownload(device voltha.Device, download voltha.ImageDownload) error {
234 log.Debug("CancelImageDownload")
235 return nil
236}
237
238func (ap *AdapterProxy) ActivateImageUpdate(device voltha.Device, download voltha.ImageDownload) error {
239 log.Debug("ActivateImageUpdate")
240 return nil
241}
242
243func (ap *AdapterProxy) RevertImageUpdate(device voltha.Device, download voltha.ImageDownload) error {
244 log.Debug("RevertImageUpdate")
245 return nil
246}
247
248func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
249 log.Debug("SelfTestDevice")
250 return nil, nil
251}
252
khenaidoofdbad6e2018-11-06 22:26:38 -0500253func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
254 log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
255 topic := kafka.Topic{Name: deviceType}
256 rpc := "receive_packet_out"
257 dId := &ca.StrType{Val:deviceId}
258 args := make([]*kafka.KVArg, 3)
259 args[0] = &kafka.KVArg{
260 Key: "deviceId",
261 Value: dId,
262 }
263 op := &ca.IntType{Val:int64(outPort)}
264 args[1] = &kafka.KVArg{
265 Key: "outPort",
266 Value: op,
267 }
268 args[2] = &kafka.KVArg{
269 Key: "packet",
270 Value: packet,
271 }
272
273 // TODO: Do we need to wait for an ACK on a packet Out?
274 success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
275 log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
276 return unPackResponse(rpc, deviceId, success, result)
277}
278
279
khenaidoo19d7b632018-10-30 10:49:50 -0400280func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
281 log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
282 topic := kafka.Topic{Name: device.Type}
283 rpc := "update_flows_bulk"
284 args := make([]*kafka.KVArg, 3)
285 args[0] = &kafka.KVArg{
286 Key: "device",
287 Value: device,
288 }
289 args[1] = &kafka.KVArg{
290 Key: "flows",
291 Value: flows,
292 }
293 args[2] = &kafka.KVArg{
294 Key: "groups",
295 Value: groups,
296 }
297
298 success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
299 log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
300 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400301}
302
khenaidoo19d7b632018-10-30 10:49:50 -0400303func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
304 log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
305 topic := kafka.Topic{Name: device.Type}
306 rpc := "update_flows_bulk"
307 args := make([]*kafka.KVArg, 3)
308 args[0] = &kafka.KVArg{
309 Key: "device",
310 Value: device,
311 }
312 args[1] = &kafka.KVArg{
313 Key: "flow_changes",
314 Value: flowChanges,
315 }
316 args[2] = &kafka.KVArg{
317 Key: "group_changes",
318 Value: groupChanges,
319 }
320
321 success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
322 log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
323 return unPackResponse(rpc, device.Id, success, result)
khenaidoob9203542018-09-17 22:56:37 -0400324}
325
326func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {
327 log.Debug("UpdatePmConfig")
328 return nil
329}
330
331func (ap *AdapterProxy) ReceivePacketOut(deviceId voltha.ID, egressPortNo int, msg interface{}) error {
332 log.Debug("ReceivePacketOut")
333 return nil
334}
335
336func (ap *AdapterProxy) SuppressAlarm(filter voltha.AlarmFilter) error {
337 log.Debug("SuppressAlarm")
338 return nil
339}
340
341func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
342 log.Debug("UnSuppressAlarm")
343 return nil
khenaidoo89b0e942018-10-21 21:11:33 -0400344}