blob: 8371e099306f2a22837b39bb649ad227f456f8b1 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
21 a "github.com/golang/protobuf/ptypes/any"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
24 ic "github.com/opencord/voltha-protos/go/inter_container"
25 "github.com/opencord/voltha-protos/go/voltha"
26 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
28 "sync"
29)
30
31type CoreProxy struct {
Matt Jeanneretcab955f2019-04-10 15:45:57 -040032 kafkaICProxy *kafka.InterContainerProxy
33 adapterTopic string
34 coreTopic string
35 deviceIdCoreMap map[string]string
William Kurkianea869482019-04-09 15:16:11 -040036 lockDeviceIdCoreMap sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040037}
38
39func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
40 var proxy CoreProxy
41 proxy.kafkaICProxy = kafkaProxy
42 proxy.adapterTopic = adapterTopic
43 proxy.coreTopic = coreTopic
44 proxy.deviceIdCoreMap = make(map[string]string)
45 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
46 log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
47
48 return &proxy
49}
50
51func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
52 if success {
53 return nil
54 } else {
55 unpackResult := &ic.Error{}
56 var err error
57 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
58 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
59 }
60 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
61 // TODO: Need to get the real error code
62 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
63 }
64}
65
66// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
67func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
68 ap.lockDeviceIdCoreMap.Lock()
69 defer ap.lockDeviceIdCoreMap.Unlock()
70 ap.deviceIdCoreMap[deviceId] = coreReference
71}
72
73// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
74func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
75 ap.lockDeviceIdCoreMap.Lock()
76 defer ap.lockDeviceIdCoreMap.Unlock()
77 delete(ap.deviceIdCoreMap, deviceId)
78}
79
80func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
81 ap.lockDeviceIdCoreMap.Lock()
82 defer ap.lockDeviceIdCoreMap.Unlock()
83
84 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
85 return kafka.Topic{Name: t}
86 }
87
88 return kafka.Topic{Name: ap.coreTopic}
89}
90
91func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
92 return kafka.Topic{Name: ap.adapterTopic}
93}
94
95func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
96 log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
97 rpc := "Register"
98 topic := kafka.Topic{Name: ap.coreTopic}
99 replyToTopic := ap.getAdapterTopic()
100 args := make([]*kafka.KVArg, 2)
101 args[0] = &kafka.KVArg{
102 Key: "adapter",
103 Value: adapter,
104 }
105 args[1] = &kafka.KVArg{
106 Key: "deviceTypes",
107 Value: deviceTypes,
108 }
109
110 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
111 log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
112 return unPackResponse(rpc, "", success, result)
113}
114
115func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
116 log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
117 rpc := "DeviceUpdate"
118 toTopic := ap.getCoreTopic(device.Id)
119 args := make([]*kafka.KVArg, 1)
120 args[0] = &kafka.KVArg{
121 Key: "device",
122 Value: device,
123 }
124 // Use a device specific topic as we are the only adaptercore handling requests for this device
125 replyToTopic := ap.getAdapterTopic()
126 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
127 log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
128 return unPackResponse(rpc, device.Id, success, result)
129}
130
131func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
132 log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
133 rpc := "PortCreated"
134 // Use a device specific topic to send the request. The adapter handling the device creates a device
135 // specific topic
136 toTopic := ap.getCoreTopic(deviceId)
137 args := make([]*kafka.KVArg, 2)
138 id := &voltha.ID{Id: deviceId}
139 args[0] = &kafka.KVArg{
140 Key: "device_id",
141 Value: id,
142 }
143 args[1] = &kafka.KVArg{
144 Key: "port",
145 Value: port,
146 }
147
148 // Use a device specific topic as we are the only adaptercore handling requests for this device
149 replyToTopic := ap.getAdapterTopic()
150 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
151 log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
152 return unPackResponse(rpc, deviceId, success, result)
153}
154
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400155func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_OperStatus) error {
156 log.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
157 rpc := "PortsStateUpdate"
158 // Use a device specific topic to send the request. The adapter handling the device creates a device
159 // specific topic
160 toTopic := ap.getCoreTopic(deviceId)
161 args := make([]*kafka.KVArg, 2)
162 id := &voltha.ID{Id: deviceId}
163 oStatus := &ic.IntType{Val: int64(operStatus)}
164
165 args[0] = &kafka.KVArg{
166 Key: "device_id",
167 Value: id,
168 }
169 args[1] = &kafka.KVArg{
170 Key: "oper_status",
171 Value: oStatus,
172 }
173
174 // Use a device specific topic as we are the only adaptercore handling requests for this device
175 replyToTopic := ap.getAdapterTopic()
176 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
177 log.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
178 return unPackResponse(rpc, deviceId, success, result)
179}
180
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400181func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
182 log.Debugw("DeleteAllPorts", log.Fields{"deviceId": deviceId})
183 rpc := "DeleteAllPorts"
184 // Use a device specific topic to send the request. The adapter handling the device creates a device
185 // specific topic
186 toTopic := ap.getCoreTopic(deviceId)
187 args := make([]*kafka.KVArg, 2)
188 id := &voltha.ID{Id: deviceId}
189
190 args[0] = &kafka.KVArg{
191 Key: "device_id",
192 Value: id,
193 }
194
195 // Use a device specific topic as we are the only adaptercore handling requests for this device
196 replyToTopic := ap.getAdapterTopic()
197 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
198 log.Debugw("DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
199 return unPackResponse(rpc, deviceId, success, result)
200}
201
William Kurkianea869482019-04-09 15:16:11 -0400202func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
203 connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
204 log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
205 rpc := "DeviceStateUpdate"
206 // Use a device specific topic to send the request. The adapter handling the device creates a device
207 // specific topic
208 toTopic := ap.getCoreTopic(deviceId)
209 args := make([]*kafka.KVArg, 3)
210 id := &voltha.ID{Id: deviceId}
211 oStatus := &ic.IntType{Val: int64(operStatus)}
212 cStatus := &ic.IntType{Val: int64(connStatus)}
213
214 args[0] = &kafka.KVArg{
215 Key: "device_id",
216 Value: id,
217 }
218 args[1] = &kafka.KVArg{
219 Key: "oper_status",
220 Value: oStatus,
221 }
222 args[2] = &kafka.KVArg{
223 Key: "connect_status",
224 Value: cStatus,
225 }
226 // Use a device specific topic as we are the only adaptercore handling requests for this device
227 replyToTopic := ap.getAdapterTopic()
228 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
229 log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
230 return unPackResponse(rpc, deviceId, success, result)
231}
232
233func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
Mahir Gunyele77977b2019-06-27 05:36:22 -0700234 childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400235 log.Debugw("ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
William Kurkianea869482019-04-09 15:16:11 -0400236 rpc := "ChildDeviceDetected"
237 // Use a device specific topic to send the request. The adapter handling the device creates a device
238 // specific topic
239 toTopic := ap.getCoreTopic(parentDeviceId)
240 replyToTopic := ap.getAdapterTopic()
241
242 args := make([]*kafka.KVArg, 7)
243 id := &voltha.ID{Id: parentDeviceId}
244 args[0] = &kafka.KVArg{
245 Key: "parent_device_id",
246 Value: id,
247 }
248 ppn := &ic.IntType{Val: int64(parentPortNo)}
249 args[1] = &kafka.KVArg{
250 Key: "parent_port_no",
251 Value: ppn,
252 }
253 cdt := &ic.StrType{Val: childDeviceType}
254 args[2] = &kafka.KVArg{
255 Key: "child_device_type",
256 Value: cdt,
257 }
258 channel := &ic.IntType{Val: int64(channelId)}
259 args[3] = &kafka.KVArg{
260 Key: "channel_id",
261 Value: channel,
262 }
263 vId := &ic.StrType{Val: vendorId}
264 args[4] = &kafka.KVArg{
265 Key: "vendor_id",
266 Value: vId,
267 }
268 sNo := &ic.StrType{Val: serialNumber}
269 args[5] = &kafka.KVArg{
270 Key: "serial_number",
271 Value: sNo,
272 }
273 oId := &ic.IntType{Val: int64(onuId)}
274 args[6] = &kafka.KVArg{
275 Key: "onu_id",
276 Value: oId,
277 }
278
279 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
280 log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
Mahir Gunyele77977b2019-06-27 05:36:22 -0700281
282 if success {
283 volthaDevice := &voltha.Device{}
284 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
285 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
286 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
287 }
288 return volthaDevice, nil
289 } else {
290 unpackResult := &ic.Error{}
291 var err error
292 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
293 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
294 }
295 log.Debugw("ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
296 // TODO: Need to get the real error code
297 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
298 }
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400299
300}
301
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400302func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
303 log.Debugw("ChildDevicesLost", log.Fields{"pDeviceId": parentDeviceId})
304 rpc := "ChildDevicesLost"
305 // Use a device specific topic to send the request. The adapter handling the device creates a device
306 // specific topic
307 toTopic := ap.getCoreTopic(parentDeviceId)
308 replyToTopic := ap.getAdapterTopic()
309
310 args := make([]*kafka.KVArg, 1)
311 id := &voltha.ID{Id: parentDeviceId}
312 args[0] = &kafka.KVArg{
313 Key: "parent_device_id",
314 Value: id,
315 }
316
317 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
318 log.Debugw("ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
319 return unPackResponse(rpc, parentDeviceId, success, result)
320}
321
322func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
323 log.Debugw("ChildDevicesDetected", log.Fields{"pDeviceId": parentDeviceId})
324 rpc := "ChildDevicesDetected"
325 // Use a device specific topic to send the request. The adapter handling the device creates a device
326 // specific topic
327 toTopic := ap.getCoreTopic(parentDeviceId)
328 replyToTopic := ap.getAdapterTopic()
329
330 args := make([]*kafka.KVArg, 1)
331 id := &voltha.ID{Id: parentDeviceId}
332 args[0] = &kafka.KVArg{
333 Key: "parent_device_id",
334 Value: id,
335 }
336
337 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
338 log.Debugw("ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
339 return unPackResponse(rpc, parentDeviceId, success, result)
340}
341
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400342func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
343 log.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
344 rpc := "GetDevice"
345
346 toTopic := ap.getCoreTopic(parentDeviceId)
347 replyToTopic := ap.getAdapterTopic()
348
349 args := make([]*kafka.KVArg, 1)
350 id := &voltha.ID{Id: deviceId}
351 args[0] = &kafka.KVArg{
352 Key: "device_id",
353 Value: id,
354 }
355
356 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
357 log.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
358
359 if success {
360 volthaDevice := &voltha.Device{}
361 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
362 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
363 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
364 }
365 return volthaDevice, nil
366 } else {
367 unpackResult := &ic.Error{}
368 var err error
369 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
370 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
371 }
372 log.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
373 // TODO: Need to get the real error code
374 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
375 }
376}
377
378func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
379 log.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
380 rpc := "GetChildDevice"
381
382 toTopic := ap.getCoreTopic(parentDeviceId)
383 replyToTopic := ap.getAdapterTopic()
384
385 args := make([]*kafka.KVArg, 4)
386 id := &voltha.ID{Id: parentDeviceId}
387 args[0] = &kafka.KVArg{
388 Key: "device_id",
389 Value: id,
390 }
391
392 var cnt uint8 = 0
393 for k, v := range kwargs {
394 cnt += 1
395 if k == "serial_number" {
396 val := &ic.StrType{Val: v.(string)}
397 args[cnt] = &kafka.KVArg{
398 Key: k,
399 Value: val,
400 }
401 } else if k == "onu_id" {
402 val := &ic.IntType{Val: int64(v.(uint32))}
403 args[cnt] = &kafka.KVArg{
404 Key: k,
405 Value: val,
406 }
407 } else if k == "parent_port_no" {
408 val := &ic.IntType{Val: int64(v.(uint32))}
409 args[cnt] = &kafka.KVArg{
410 Key: k,
411 Value: val,
412 }
413 }
414 }
415
416 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
417 log.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
418
419 if success {
420 volthaDevice := &voltha.Device{}
421 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
422 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
423 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
424 }
425 return volthaDevice, nil
426 } else {
427 unpackResult := &ic.Error{}
428 var err error
429 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
430 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
431 }
432 log.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
433 // TODO: Need to get the real error code
434 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
435 }
William Kurkianea869482019-04-09 15:16:11 -0400436}
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400437
Chaitrashree G Sbe6ab942019-05-24 06:42:49 -0400438func (ap *CoreProxy) GetChildDevices(ctx context.Context, parentDeviceId string) (*voltha.Devices, error) {
439 log.Debugw("GetChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
440 rpc := "GetChildDevices"
441
442 toTopic := ap.getCoreTopic(parentDeviceId)
443 replyToTopic := ap.getAdapterTopic()
444
445 args := make([]*kafka.KVArg, 1)
446 id := &voltha.ID{Id: parentDeviceId}
447 args[0] = &kafka.KVArg{
448 Key: "device_id",
449 Value: id,
450 }
451
452 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
453 log.Debugw("GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
454
455 if success {
456 volthaDevices := &voltha.Devices{}
457 if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
458 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
459 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
460 }
461 return volthaDevices, nil
462 } else {
463 unpackResult := &ic.Error{}
464 var err error
465 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
466 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
467 }
468 log.Debugw("GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
469 // TODO: Need to get the real error code
470 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
471 }
472}
473
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400474func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
475 log.Debugw("SendPacketIn", log.Fields{"deviceId": deviceId, "port": port, "pktPayload": pktPayload})
476 rpc := "PacketIn"
477 // Use a device specific topic to send the request. The adapter handling the device creates a device
478 // specific topic
479 toTopic := ap.getCoreTopic(deviceId)
480 replyToTopic := ap.getAdapterTopic()
481
482 args := make([]*kafka.KVArg, 3)
483 id := &voltha.ID{Id: deviceId}
484 args[0] = &kafka.KVArg{
485 Key: "device_id",
486 Value: id,
487 }
488 portNo := &ic.IntType{Val: int64(port)}
489 args[1] = &kafka.KVArg{
490 Key: "port",
491 Value: portNo,
492 }
493 pkt := &ic.Packet{Payload: pktPayload}
494 args[2] = &kafka.KVArg{
495 Key: "packet",
496 Value: pkt,
497 }
498 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
Mahir Gunyele77977b2019-06-27 05:36:22 -0700499 log.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400500 return unPackResponse(rpc, deviceId, success, result)
501}