blob: e2d79fca84bc31747b17660d644cd6df2405ae3c [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
21 a "github.com/golang/protobuf/ptypes/any"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
24 ic "github.com/opencord/voltha-protos/go/inter_container"
25 "github.com/opencord/voltha-protos/go/voltha"
26 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
28 "sync"
29)
30
31type CoreProxy struct {
Matt Jeanneretcab955f2019-04-10 15:45:57 -040032 kafkaICProxy *kafka.InterContainerProxy
33 adapterTopic string
34 coreTopic string
35 deviceIdCoreMap map[string]string
William Kurkianea869482019-04-09 15:16:11 -040036 lockDeviceIdCoreMap sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040037}
38
39func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
40 var proxy CoreProxy
41 proxy.kafkaICProxy = kafkaProxy
42 proxy.adapterTopic = adapterTopic
43 proxy.coreTopic = coreTopic
44 proxy.deviceIdCoreMap = make(map[string]string)
45 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
46 log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
47
48 return &proxy
49}
50
51func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
52 if success {
53 return nil
54 } else {
55 unpackResult := &ic.Error{}
56 var err error
57 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
58 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
59 }
60 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
61 // TODO: Need to get the real error code
62 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
63 }
64}
65
66// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
67func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
68 ap.lockDeviceIdCoreMap.Lock()
69 defer ap.lockDeviceIdCoreMap.Unlock()
70 ap.deviceIdCoreMap[deviceId] = coreReference
71}
72
73// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
74func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
75 ap.lockDeviceIdCoreMap.Lock()
76 defer ap.lockDeviceIdCoreMap.Unlock()
77 delete(ap.deviceIdCoreMap, deviceId)
78}
79
80func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
81 ap.lockDeviceIdCoreMap.Lock()
82 defer ap.lockDeviceIdCoreMap.Unlock()
83
84 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
85 return kafka.Topic{Name: t}
86 }
87
88 return kafka.Topic{Name: ap.coreTopic}
89}
90
91func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
92 return kafka.Topic{Name: ap.adapterTopic}
93}
94
95func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
96 log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
97 rpc := "Register"
98 topic := kafka.Topic{Name: ap.coreTopic}
99 replyToTopic := ap.getAdapterTopic()
100 args := make([]*kafka.KVArg, 2)
101 args[0] = &kafka.KVArg{
102 Key: "adapter",
103 Value: adapter,
104 }
105 args[1] = &kafka.KVArg{
106 Key: "deviceTypes",
107 Value: deviceTypes,
108 }
109
110 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
111 log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
112 return unPackResponse(rpc, "", success, result)
113}
114
115func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
116 log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
117 rpc := "DeviceUpdate"
118 toTopic := ap.getCoreTopic(device.Id)
119 args := make([]*kafka.KVArg, 1)
120 args[0] = &kafka.KVArg{
121 Key: "device",
122 Value: device,
123 }
124 // Use a device specific topic as we are the only adaptercore handling requests for this device
125 replyToTopic := ap.getAdapterTopic()
126 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
127 log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
128 return unPackResponse(rpc, device.Id, success, result)
129}
130
131func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
132 log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
133 rpc := "PortCreated"
134 // Use a device specific topic to send the request. The adapter handling the device creates a device
135 // specific topic
136 toTopic := ap.getCoreTopic(deviceId)
137 args := make([]*kafka.KVArg, 2)
138 id := &voltha.ID{Id: deviceId}
139 args[0] = &kafka.KVArg{
140 Key: "device_id",
141 Value: id,
142 }
143 args[1] = &kafka.KVArg{
144 Key: "port",
145 Value: port,
146 }
147
148 // Use a device specific topic as we are the only adaptercore handling requests for this device
149 replyToTopic := ap.getAdapterTopic()
150 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
151 log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
152 return unPackResponse(rpc, deviceId, success, result)
153}
154
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400155func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_OperStatus) error {
156 log.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
157 rpc := "PortsStateUpdate"
158 // Use a device specific topic to send the request. The adapter handling the device creates a device
159 // specific topic
160 toTopic := ap.getCoreTopic(deviceId)
161 args := make([]*kafka.KVArg, 2)
162 id := &voltha.ID{Id: deviceId}
163 oStatus := &ic.IntType{Val: int64(operStatus)}
164
165 args[0] = &kafka.KVArg{
166 Key: "device_id",
167 Value: id,
168 }
169 args[1] = &kafka.KVArg{
170 Key: "oper_status",
171 Value: oStatus,
172 }
173
174 // Use a device specific topic as we are the only adaptercore handling requests for this device
175 replyToTopic := ap.getAdapterTopic()
176 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
177 log.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
178 return unPackResponse(rpc, deviceId, success, result)
179}
180
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400181func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
182 log.Debugw("DeleteAllPorts", log.Fields{"deviceId": deviceId})
183 rpc := "DeleteAllPorts"
184 // Use a device specific topic to send the request. The adapter handling the device creates a device
185 // specific topic
186 toTopic := ap.getCoreTopic(deviceId)
187 args := make([]*kafka.KVArg, 2)
188 id := &voltha.ID{Id: deviceId}
189
190 args[0] = &kafka.KVArg{
191 Key: "device_id",
192 Value: id,
193 }
194
195 // Use a device specific topic as we are the only adaptercore handling requests for this device
196 replyToTopic := ap.getAdapterTopic()
197 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
198 log.Debugw("DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
199 return unPackResponse(rpc, deviceId, success, result)
200}
201
William Kurkianea869482019-04-09 15:16:11 -0400202func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
203 connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
204 log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
205 rpc := "DeviceStateUpdate"
206 // Use a device specific topic to send the request. The adapter handling the device creates a device
207 // specific topic
208 toTopic := ap.getCoreTopic(deviceId)
209 args := make([]*kafka.KVArg, 3)
210 id := &voltha.ID{Id: deviceId}
211 oStatus := &ic.IntType{Val: int64(operStatus)}
212 cStatus := &ic.IntType{Val: int64(connStatus)}
213
214 args[0] = &kafka.KVArg{
215 Key: "device_id",
216 Value: id,
217 }
218 args[1] = &kafka.KVArg{
219 Key: "oper_status",
220 Value: oStatus,
221 }
222 args[2] = &kafka.KVArg{
223 Key: "connect_status",
224 Value: cStatus,
225 }
226 // Use a device specific topic as we are the only adaptercore handling requests for this device
227 replyToTopic := ap.getAdapterTopic()
228 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
229 log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
230 return unPackResponse(rpc, deviceId, success, result)
231}
232
233func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400234 childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) error {
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400235 log.Debugw("ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
William Kurkianea869482019-04-09 15:16:11 -0400236 rpc := "ChildDeviceDetected"
237 // Use a device specific topic to send the request. The adapter handling the device creates a device
238 // specific topic
239 toTopic := ap.getCoreTopic(parentDeviceId)
240 replyToTopic := ap.getAdapterTopic()
241
242 args := make([]*kafka.KVArg, 7)
243 id := &voltha.ID{Id: parentDeviceId}
244 args[0] = &kafka.KVArg{
245 Key: "parent_device_id",
246 Value: id,
247 }
248 ppn := &ic.IntType{Val: int64(parentPortNo)}
249 args[1] = &kafka.KVArg{
250 Key: "parent_port_no",
251 Value: ppn,
252 }
253 cdt := &ic.StrType{Val: childDeviceType}
254 args[2] = &kafka.KVArg{
255 Key: "child_device_type",
256 Value: cdt,
257 }
258 channel := &ic.IntType{Val: int64(channelId)}
259 args[3] = &kafka.KVArg{
260 Key: "channel_id",
261 Value: channel,
262 }
263 vId := &ic.StrType{Val: vendorId}
264 args[4] = &kafka.KVArg{
265 Key: "vendor_id",
266 Value: vId,
267 }
268 sNo := &ic.StrType{Val: serialNumber}
269 args[5] = &kafka.KVArg{
270 Key: "serial_number",
271 Value: sNo,
272 }
273 oId := &ic.IntType{Val: int64(onuId)}
274 args[6] = &kafka.KVArg{
275 Key: "onu_id",
276 Value: oId,
277 }
278
279 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
280 log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
281 return unPackResponse(rpc, parentDeviceId, success, result)
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400282
283}
284
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400285func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
286 log.Debugw("ChildDevicesLost", log.Fields{"pDeviceId": parentDeviceId})
287 rpc := "ChildDevicesLost"
288 // Use a device specific topic to send the request. The adapter handling the device creates a device
289 // specific topic
290 toTopic := ap.getCoreTopic(parentDeviceId)
291 replyToTopic := ap.getAdapterTopic()
292
293 args := make([]*kafka.KVArg, 1)
294 id := &voltha.ID{Id: parentDeviceId}
295 args[0] = &kafka.KVArg{
296 Key: "parent_device_id",
297 Value: id,
298 }
299
300 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
301 log.Debugw("ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
302 return unPackResponse(rpc, parentDeviceId, success, result)
303}
304
305func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
306 log.Debugw("ChildDevicesDetected", log.Fields{"pDeviceId": parentDeviceId})
307 rpc := "ChildDevicesDetected"
308 // Use a device specific topic to send the request. The adapter handling the device creates a device
309 // specific topic
310 toTopic := ap.getCoreTopic(parentDeviceId)
311 replyToTopic := ap.getAdapterTopic()
312
313 args := make([]*kafka.KVArg, 1)
314 id := &voltha.ID{Id: parentDeviceId}
315 args[0] = &kafka.KVArg{
316 Key: "parent_device_id",
317 Value: id,
318 }
319
320 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
321 log.Debugw("ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
322 return unPackResponse(rpc, parentDeviceId, success, result)
323}
324
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400325func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
326 log.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
327 rpc := "GetDevice"
328
329 toTopic := ap.getCoreTopic(parentDeviceId)
330 replyToTopic := ap.getAdapterTopic()
331
332 args := make([]*kafka.KVArg, 1)
333 id := &voltha.ID{Id: deviceId}
334 args[0] = &kafka.KVArg{
335 Key: "device_id",
336 Value: id,
337 }
338
339 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
340 log.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
341
342 if success {
343 volthaDevice := &voltha.Device{}
344 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
345 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
346 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
347 }
348 return volthaDevice, nil
349 } else {
350 unpackResult := &ic.Error{}
351 var err error
352 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
353 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
354 }
355 log.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
356 // TODO: Need to get the real error code
357 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
358 }
359}
360
361func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
362 log.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
363 rpc := "GetChildDevice"
364
365 toTopic := ap.getCoreTopic(parentDeviceId)
366 replyToTopic := ap.getAdapterTopic()
367
368 args := make([]*kafka.KVArg, 4)
369 id := &voltha.ID{Id: parentDeviceId}
370 args[0] = &kafka.KVArg{
371 Key: "device_id",
372 Value: id,
373 }
374
375 var cnt uint8 = 0
376 for k, v := range kwargs {
377 cnt += 1
378 if k == "serial_number" {
379 val := &ic.StrType{Val: v.(string)}
380 args[cnt] = &kafka.KVArg{
381 Key: k,
382 Value: val,
383 }
384 } else if k == "onu_id" {
385 val := &ic.IntType{Val: int64(v.(uint32))}
386 args[cnt] = &kafka.KVArg{
387 Key: k,
388 Value: val,
389 }
390 } else if k == "parent_port_no" {
391 val := &ic.IntType{Val: int64(v.(uint32))}
392 args[cnt] = &kafka.KVArg{
393 Key: k,
394 Value: val,
395 }
396 }
397 }
398
399 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
400 log.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
401
402 if success {
403 volthaDevice := &voltha.Device{}
404 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
405 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
406 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
407 }
408 return volthaDevice, nil
409 } else {
410 unpackResult := &ic.Error{}
411 var err error
412 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
413 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
414 }
415 log.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
416 // TODO: Need to get the real error code
417 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
418 }
William Kurkianea869482019-04-09 15:16:11 -0400419}
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400420
421func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
422 log.Debugw("SendPacketIn", log.Fields{"deviceId": deviceId, "port": port, "pktPayload": pktPayload})
423 rpc := "PacketIn"
424 // Use a device specific topic to send the request. The adapter handling the device creates a device
425 // specific topic
426 toTopic := ap.getCoreTopic(deviceId)
427 replyToTopic := ap.getAdapterTopic()
428
429 args := make([]*kafka.KVArg, 3)
430 id := &voltha.ID{Id: deviceId}
431 args[0] = &kafka.KVArg{
432 Key: "device_id",
433 Value: id,
434 }
435 portNo := &ic.IntType{Val: int64(port)}
436 args[1] = &kafka.KVArg{
437 Key: "port",
438 Value: portNo,
439 }
440 pkt := &ic.Packet{Payload: pktPayload}
441 args[2] = &kafka.KVArg{
442 Key: "packet",
443 Value: pkt,
444 }
445 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
446 log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": deviceId, "success": success})
447 return unPackResponse(rpc, deviceId, success, result)
448}