blob: b1ce1c391375bfd62510e191e1df9e6b8cdc9b60 [file] [log] [blame]
Scott Baker2c1c4822019-10-16 11:02:41 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
Chaitrashree G S66d41352020-01-09 20:18:58 -050020 "sync"
21
Scott Baker2c1c4822019-10-16 11:02:41 -070022 "github.com/golang/protobuf/ptypes"
23 a "github.com/golang/protobuf/ptypes/any"
serkant.uluderyab38671c2019-11-01 09:35:38 -070024 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
25 "github.com/opencord/voltha-lib-go/v3/pkg/log"
26 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
27 "github.com/opencord/voltha-protos/v3/go/voltha"
Scott Baker2c1c4822019-10-16 11:02:41 -070028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
Scott Baker2c1c4822019-10-16 11:02:41 -070030)
31
32type CoreProxy struct {
Matteo Scandolo2ba00d32020-01-16 17:33:03 -080033 kafkaICProxy kafka.InterContainerProxy
Scott Baker2c1c4822019-10-16 11:02:41 -070034 adapterTopic string
35 coreTopic string
36 deviceIdCoreMap map[string]string
37 lockDeviceIdCoreMap sync.RWMutex
38}
39
Neha Sharma94f16a92020-06-26 04:17:55 +000040func NewCoreProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
Scott Baker2c1c4822019-10-16 11:02:41 -070041 var proxy CoreProxy
42 proxy.kafkaICProxy = kafkaProxy
43 proxy.adapterTopic = adapterTopic
44 proxy.coreTopic = coreTopic
45 proxy.deviceIdCoreMap = make(map[string]string)
46 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
Neha Sharma94f16a92020-06-26 04:17:55 +000047 logger.Debugw(ctx, "TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
Scott Baker2c1c4822019-10-16 11:02:41 -070048
49 return &proxy
50}
51
Neha Sharma94f16a92020-06-26 04:17:55 +000052func unPackResponse(ctx context.Context, rpc string, deviceId string, success bool, response *a.Any) error {
Scott Baker2c1c4822019-10-16 11:02:41 -070053 if success {
54 return nil
55 } else {
56 unpackResult := &ic.Error{}
57 var err error
58 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +000059 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -070060 }
divyadesai5d8836b2020-08-18 07:40:59 +000061 logger.Debugw(ctx, "response", log.Fields{"rpc": rpc, "device-id": deviceId, "success": success, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -070062 // TODO: Need to get the real error code
63 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
64 }
65}
66
67// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
68func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
69 ap.lockDeviceIdCoreMap.Lock()
70 defer ap.lockDeviceIdCoreMap.Unlock()
71 ap.deviceIdCoreMap[deviceId] = coreReference
72}
73
74// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
75func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
76 ap.lockDeviceIdCoreMap.Lock()
77 defer ap.lockDeviceIdCoreMap.Unlock()
78 delete(ap.deviceIdCoreMap, deviceId)
79}
80
81func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
82 ap.lockDeviceIdCoreMap.Lock()
83 defer ap.lockDeviceIdCoreMap.Unlock()
84
85 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
86 return kafka.Topic{Name: t}
87 }
88
89 return kafka.Topic{Name: ap.coreTopic}
90}
91
92func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
93 return kafka.Topic{Name: ap.adapterTopic}
94}
95
96func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
Neha Sharma94f16a92020-06-26 04:17:55 +000097 logger.Debugw(ctx, "registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
Scott Baker2c1c4822019-10-16 11:02:41 -070098 rpc := "Register"
99 topic := kafka.Topic{Name: ap.coreTopic}
100 replyToTopic := ap.getAdapterTopic()
101 args := make([]*kafka.KVArg, 2)
Matteo Scandolod58eaef2020-03-30 12:30:02 -0700102
103 if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
Neha Sharma94f16a92020-06-26 04:17:55 +0000104 logger.Fatal(ctx, "totalReplicas can't be 0, since you're here you have at least one")
Matteo Scandolod58eaef2020-03-30 12:30:02 -0700105 }
106
107 if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
Neha Sharma94f16a92020-06-26 04:17:55 +0000108 logger.Fatal(ctx, "currentReplica can't be 0, it has to start from 1")
Matteo Scandolod58eaef2020-03-30 12:30:02 -0700109 }
110
111 if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
112 // if the adapter is not setting these fields they default to 0,
113 // in that case it means the adapter is not ready to be scaled and thus it defaults
114 // to a single instance
115 adapter.CurrentReplica = 1
116 adapter.TotalReplicas = 1
117 }
118
119 if adapter.CurrentReplica > adapter.TotalReplicas {
Neha Sharma94f16a92020-06-26 04:17:55 +0000120 logger.Fatalf(ctx, "CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
Matteo Scandolod58eaef2020-03-30 12:30:02 -0700121 adapter.CurrentReplica, adapter.TotalReplicas)
122 }
123
Scott Baker2c1c4822019-10-16 11:02:41 -0700124 args[0] = &kafka.KVArg{
125 Key: "adapter",
126 Value: adapter,
127 }
128 args[1] = &kafka.KVArg{
129 Key: "deviceTypes",
130 Value: deviceTypes,
131 }
132
133 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
Neha Sharma94f16a92020-06-26 04:17:55 +0000134 logger.Debugw(ctx, "Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
135 return unPackResponse(ctx, rpc, "", success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700136}
137
138func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
divyadesai5d8836b2020-08-18 07:40:59 +0000139 logger.Debugw(ctx, "DeviceUpdate", log.Fields{"device-id": device.Id})
Scott Baker2c1c4822019-10-16 11:02:41 -0700140 rpc := "DeviceUpdate"
141 toTopic := ap.getCoreTopic(device.Id)
142 args := make([]*kafka.KVArg, 1)
143 args[0] = &kafka.KVArg{
144 Key: "device",
145 Value: device,
146 }
147 // Use a device specific topic as we are the only adaptercore handling requests for this device
148 replyToTopic := ap.getAdapterTopic()
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000149 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000150 logger.Debugw(ctx, "DeviceUpdate-response", log.Fields{"device-id": device.Id, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000151 return unPackResponse(ctx, rpc, device.Id, success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700152}
153
154func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
Neha Sharma94f16a92020-06-26 04:17:55 +0000155 logger.Debugw(ctx, "PortCreated", log.Fields{"portNo": port.PortNo})
Scott Baker2c1c4822019-10-16 11:02:41 -0700156 rpc := "PortCreated"
157 // Use a device specific topic to send the request. The adapter handling the device creates a device
158 // specific topic
159 toTopic := ap.getCoreTopic(deviceId)
160 args := make([]*kafka.KVArg, 2)
161 id := &voltha.ID{Id: deviceId}
162 args[0] = &kafka.KVArg{
163 Key: "device_id",
164 Value: id,
165 }
166 args[1] = &kafka.KVArg{
167 Key: "port",
168 Value: port,
169 }
170
171 // Use a device specific topic as we are the only adaptercore handling requests for this device
172 replyToTopic := ap.getAdapterTopic()
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000173 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000174 logger.Debugw(ctx, "PortCreated-response", log.Fields{"device-id": deviceId, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000175 return unPackResponse(ctx, rpc, deviceId, success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700176}
177
Kent Hagerman143fea32020-07-10 15:28:55 -0400178func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
divyadesai5d8836b2020-08-18 07:40:59 +0000179 logger.Debugw(ctx, "PortsStateUpdate", log.Fields{"device-id": deviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700180 rpc := "PortsStateUpdate"
181 // Use a device specific topic to send the request. The adapter handling the device creates a device
182 // specific topic
183 toTopic := ap.getCoreTopic(deviceId)
Kent Hagerman143fea32020-07-10 15:28:55 -0400184 args := []*kafka.KVArg{{
Scott Baker2c1c4822019-10-16 11:02:41 -0700185 Key: "device_id",
Kent Hagerman143fea32020-07-10 15:28:55 -0400186 Value: &voltha.ID{Id: deviceId},
187 }, {
188 Key: "port_type_filter",
189 Value: &ic.IntType{Val: int64(portTypeFilter)},
190 }, {
Scott Baker2c1c4822019-10-16 11:02:41 -0700191 Key: "oper_status",
Kent Hagerman143fea32020-07-10 15:28:55 -0400192 Value: &ic.IntType{Val: int64(operStatus)},
193 }}
Scott Baker2c1c4822019-10-16 11:02:41 -0700194
195 // Use a device specific topic as we are the only adaptercore handling requests for this device
196 replyToTopic := ap.getAdapterTopic()
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000197 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000198 logger.Debugw(ctx, "PortsStateUpdate-response", log.Fields{"device-id": deviceId, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000199 return unPackResponse(ctx, rpc, deviceId, success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700200}
201
202func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
divyadesai5d8836b2020-08-18 07:40:59 +0000203 logger.Debugw(ctx, "DeleteAllPorts", log.Fields{"device-id": deviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700204 rpc := "DeleteAllPorts"
205 // Use a device specific topic to send the request. The adapter handling the device creates a device
206 // specific topic
207 toTopic := ap.getCoreTopic(deviceId)
208 args := make([]*kafka.KVArg, 2)
209 id := &voltha.ID{Id: deviceId}
210
211 args[0] = &kafka.KVArg{
212 Key: "device_id",
213 Value: id,
214 }
215
216 // Use a device specific topic as we are the only adaptercore handling requests for this device
217 replyToTopic := ap.getAdapterTopic()
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000218 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000219 logger.Debugw(ctx, "DeleteAllPorts-response", log.Fields{"device-id": deviceId, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000220 return unPackResponse(ctx, rpc, deviceId, success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700221}
222
Kent Hagerman143fea32020-07-10 15:28:55 -0400223func (ap *CoreProxy) GetDevicePort(ctx context.Context, deviceID string, portNo uint32) (*voltha.Port, error) {
224 logger.Debugw(ctx, "GetDevicePort", log.Fields{"device-id": deviceID})
225 rpc := "GetDevicePort"
226
227 toTopic := ap.getCoreTopic(deviceID)
228 replyToTopic := ap.getAdapterTopic()
229
230 args := []*kafka.KVArg{{
231 Key: "device_id",
232 Value: &voltha.ID{Id: deviceID},
233 }, {
234 Key: "port_no",
235 Value: &ic.IntType{Val: int64(portNo)},
236 }}
237
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000238 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
Kent Hagerman143fea32020-07-10 15:28:55 -0400239 logger.Debugw(ctx, "GetDevicePort-response", log.Fields{"device-id": deviceID, "success": success})
240
241 if success {
242 port := &voltha.Port{}
243 if err := ptypes.UnmarshalAny(result, port); err != nil {
244 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
245 return nil, status.Error(codes.InvalidArgument, err.Error())
246 }
247 return port, nil
248 } else {
249 unpackResult := &ic.Error{}
250 var err error
251 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
252 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
253 }
254 logger.Debugw(ctx, "GetDevicePort-return", log.Fields{"device-id": deviceID, "success": success, "error": err})
255 // TODO: Need to get the real error code
256 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
257 }
258}
259
260func (ap *CoreProxy) ListDevicePorts(ctx context.Context, deviceID string) ([]*voltha.Port, error) {
261 logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": deviceID})
262 rpc := "ListDevicePorts"
263
264 toTopic := ap.getCoreTopic(deviceID)
265 replyToTopic := ap.getAdapterTopic()
266
267 args := []*kafka.KVArg{{
268 Key: "device_id",
269 Value: &voltha.ID{Id: deviceID},
270 }}
271
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000272 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
Kent Hagerman143fea32020-07-10 15:28:55 -0400273 logger.Debugw(ctx, "ListDevicePorts-response", log.Fields{"device-id": deviceID, "success": success})
274
275 if success {
276 ports := &voltha.Ports{}
277 if err := ptypes.UnmarshalAny(result, ports); err != nil {
278 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
279 return nil, status.Error(codes.InvalidArgument, err.Error())
280 }
281 return ports.Items, nil
282 } else {
283 unpackResult := &ic.Error{}
284 var err error
285 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
286 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
287 }
288 logger.Debugw(ctx, "ListDevicePorts-return", log.Fields{"device-id": deviceID, "success": success, "error": err})
289 // TODO: Need to get the real error code
290 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
291 }
292}
293
Scott Baker2c1c4822019-10-16 11:02:41 -0700294func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
serkant.uluderyab38671c2019-11-01 09:35:38 -0700295 connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error {
divyadesai5d8836b2020-08-18 07:40:59 +0000296 logger.Debugw(ctx, "DeviceStateUpdate", log.Fields{"device-id": deviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700297 rpc := "DeviceStateUpdate"
298 // Use a device specific topic to send the request. The adapter handling the device creates a device
299 // specific topic
300 toTopic := ap.getCoreTopic(deviceId)
301 args := make([]*kafka.KVArg, 3)
302 id := &voltha.ID{Id: deviceId}
303 oStatus := &ic.IntType{Val: int64(operStatus)}
304 cStatus := &ic.IntType{Val: int64(connStatus)}
305
306 args[0] = &kafka.KVArg{
307 Key: "device_id",
308 Value: id,
309 }
310 args[1] = &kafka.KVArg{
311 Key: "oper_status",
312 Value: oStatus,
313 }
314 args[2] = &kafka.KVArg{
315 Key: "connect_status",
316 Value: cStatus,
317 }
318 // Use a device specific topic as we are the only adaptercore handling requests for this device
319 replyToTopic := ap.getAdapterTopic()
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000320 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000321 logger.Debugw(ctx, "DeviceStateUpdate-response", log.Fields{"device-id": deviceId, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000322 return unPackResponse(ctx, rpc, deviceId, success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700323}
324
325func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
326 childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
divyadesai5d8836b2020-08-18 07:40:59 +0000327 logger.Debugw(ctx, "ChildDeviceDetected", log.Fields{"parent-device-id": parentDeviceId, "channelId": channelId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700328 rpc := "ChildDeviceDetected"
329 // Use a device specific topic to send the request. The adapter handling the device creates a device
330 // specific topic
331 toTopic := ap.getCoreTopic(parentDeviceId)
332 replyToTopic := ap.getAdapterTopic()
333
334 args := make([]*kafka.KVArg, 7)
335 id := &voltha.ID{Id: parentDeviceId}
336 args[0] = &kafka.KVArg{
337 Key: "parent_device_id",
338 Value: id,
339 }
340 ppn := &ic.IntType{Val: int64(parentPortNo)}
341 args[1] = &kafka.KVArg{
342 Key: "parent_port_no",
343 Value: ppn,
344 }
345 cdt := &ic.StrType{Val: childDeviceType}
346 args[2] = &kafka.KVArg{
347 Key: "child_device_type",
348 Value: cdt,
349 }
350 channel := &ic.IntType{Val: int64(channelId)}
351 args[3] = &kafka.KVArg{
352 Key: "channel_id",
353 Value: channel,
354 }
355 vId := &ic.StrType{Val: vendorId}
356 args[4] = &kafka.KVArg{
357 Key: "vendor_id",
358 Value: vId,
359 }
360 sNo := &ic.StrType{Val: serialNumber}
361 args[5] = &kafka.KVArg{
362 Key: "serial_number",
363 Value: sNo,
364 }
365 oId := &ic.IntType{Val: int64(onuId)}
366 args[6] = &kafka.KVArg{
367 Key: "onu_id",
368 Value: oId,
369 }
370
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000371 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000372 logger.Debugw(ctx, "ChildDeviceDetected-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700373
374 if success {
375 volthaDevice := &voltha.Device{}
376 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000377 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Matteo Scandolo1530d412020-01-30 14:57:46 -0800378 return nil, status.Error(codes.InvalidArgument, err.Error())
Scott Baker2c1c4822019-10-16 11:02:41 -0700379 }
380 return volthaDevice, nil
381 } else {
382 unpackResult := &ic.Error{}
383 var err error
384 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000385 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700386 }
divyadesai5d8836b2020-08-18 07:40:59 +0000387 logger.Debugw(ctx, "ChildDeviceDetected-return", log.Fields{"device-id": parentDeviceId, "success": success, "error": err})
Matteo Scandolo1530d412020-01-30 14:57:46 -0800388
Neha Sharma94f16a92020-06-26 04:17:55 +0000389 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
Scott Baker2c1c4822019-10-16 11:02:41 -0700390 }
391
392}
393
394func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
divyadesai5d8836b2020-08-18 07:40:59 +0000395 logger.Debugw(ctx, "ChildDevicesLost", log.Fields{"parent-device-id": parentDeviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700396 rpc := "ChildDevicesLost"
397 // Use a device specific topic to send the request. The adapter handling the device creates a device
398 // specific topic
399 toTopic := ap.getCoreTopic(parentDeviceId)
400 replyToTopic := ap.getAdapterTopic()
401
402 args := make([]*kafka.KVArg, 1)
403 id := &voltha.ID{Id: parentDeviceId}
404 args[0] = &kafka.KVArg{
405 Key: "parent_device_id",
406 Value: id,
407 }
408
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000409 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000410 logger.Debugw(ctx, "ChildDevicesLost-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000411 return unPackResponse(ctx, rpc, parentDeviceId, success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700412}
413
414func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
divyadesai5d8836b2020-08-18 07:40:59 +0000415 logger.Debugw(ctx, "ChildDevicesDetected", log.Fields{"parent-device-id": parentDeviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700416 rpc := "ChildDevicesDetected"
417 // Use a device specific topic to send the request. The adapter handling the device creates a device
418 // specific topic
419 toTopic := ap.getCoreTopic(parentDeviceId)
420 replyToTopic := ap.getAdapterTopic()
421
422 args := make([]*kafka.KVArg, 1)
423 id := &voltha.ID{Id: parentDeviceId}
424 args[0] = &kafka.KVArg{
425 Key: "parent_device_id",
426 Value: id,
427 }
428
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000429 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000430 logger.Debugw(ctx, "ChildDevicesDetected-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000431 return unPackResponse(ctx, rpc, parentDeviceId, success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700432}
433
434func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
divyadesai5d8836b2020-08-18 07:40:59 +0000435 logger.Debugw(ctx, "GetDevice", log.Fields{"device-id": deviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700436 rpc := "GetDevice"
437
438 toTopic := ap.getCoreTopic(parentDeviceId)
439 replyToTopic := ap.getAdapterTopic()
440
441 args := make([]*kafka.KVArg, 1)
442 id := &voltha.ID{Id: deviceId}
443 args[0] = &kafka.KVArg{
444 Key: "device_id",
445 Value: id,
446 }
447
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000448 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000449 logger.Debugw(ctx, "GetDevice-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700450
451 if success {
452 volthaDevice := &voltha.Device{}
453 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000454 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Matteo Scandolo1530d412020-01-30 14:57:46 -0800455 return nil, status.Error(codes.InvalidArgument, err.Error())
Scott Baker2c1c4822019-10-16 11:02:41 -0700456 }
457 return volthaDevice, nil
458 } else {
459 unpackResult := &ic.Error{}
460 var err error
461 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000462 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700463 }
divyadesai5d8836b2020-08-18 07:40:59 +0000464 logger.Debugw(ctx, "GetDevice-return", log.Fields{"parent-device-id": parentDeviceId, "success": success, "error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700465 // TODO: Need to get the real error code
Neha Sharma94f16a92020-06-26 04:17:55 +0000466 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
Scott Baker2c1c4822019-10-16 11:02:41 -0700467 }
468}
469
470func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
divyadesai5d8836b2020-08-18 07:40:59 +0000471 logger.Debugw(ctx, "GetChildDevice", log.Fields{"parent-device-id": parentDeviceId, "kwargs": kwargs})
Scott Baker2c1c4822019-10-16 11:02:41 -0700472 rpc := "GetChildDevice"
473
474 toTopic := ap.getCoreTopic(parentDeviceId)
475 replyToTopic := ap.getAdapterTopic()
476
477 args := make([]*kafka.KVArg, 4)
478 id := &voltha.ID{Id: parentDeviceId}
479 args[0] = &kafka.KVArg{
480 Key: "device_id",
481 Value: id,
482 }
483
484 var cnt uint8 = 0
485 for k, v := range kwargs {
486 cnt += 1
487 if k == "serial_number" {
488 val := &ic.StrType{Val: v.(string)}
489 args[cnt] = &kafka.KVArg{
490 Key: k,
491 Value: val,
492 }
493 } else if k == "onu_id" {
494 val := &ic.IntType{Val: int64(v.(uint32))}
495 args[cnt] = &kafka.KVArg{
496 Key: k,
497 Value: val,
498 }
499 } else if k == "parent_port_no" {
500 val := &ic.IntType{Val: int64(v.(uint32))}
501 args[cnt] = &kafka.KVArg{
502 Key: k,
503 Value: val,
504 }
505 }
506 }
507
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000508 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000509 logger.Debugw(ctx, "GetChildDevice-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700510
511 if success {
512 volthaDevice := &voltha.Device{}
513 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000514 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Matteo Scandolo1530d412020-01-30 14:57:46 -0800515 return nil, status.Error(codes.InvalidArgument, err.Error())
Scott Baker2c1c4822019-10-16 11:02:41 -0700516 }
517 return volthaDevice, nil
518 } else {
519 unpackResult := &ic.Error{}
520 var err error
521 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000522 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700523 }
divyadesai5d8836b2020-08-18 07:40:59 +0000524 logger.Debugw(ctx, "GetChildDevice-return", log.Fields{"parent-device-id": parentDeviceId, "success": success, "error": err})
Matteo Scandolob45cf592020-01-21 16:10:56 -0800525
Neha Sharma94f16a92020-06-26 04:17:55 +0000526 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
Scott Baker2c1c4822019-10-16 11:02:41 -0700527 }
528}
529
530func (ap *CoreProxy) GetChildDevices(ctx context.Context, parentDeviceId string) (*voltha.Devices, error) {
divyadesai5d8836b2020-08-18 07:40:59 +0000531 logger.Debugw(ctx, "GetChildDevices", log.Fields{"parent-device-id": parentDeviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700532 rpc := "GetChildDevices"
533
534 toTopic := ap.getCoreTopic(parentDeviceId)
535 replyToTopic := ap.getAdapterTopic()
536
537 args := make([]*kafka.KVArg, 1)
538 id := &voltha.ID{Id: parentDeviceId}
539 args[0] = &kafka.KVArg{
540 Key: "device_id",
541 Value: id,
542 }
543
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000544 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000545 logger.Debugw(ctx, "GetChildDevices-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
Scott Baker2c1c4822019-10-16 11:02:41 -0700546
547 if success {
548 volthaDevices := &voltha.Devices{}
549 if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000550 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Matteo Scandolo1530d412020-01-30 14:57:46 -0800551 return nil, status.Error(codes.InvalidArgument, err.Error())
Scott Baker2c1c4822019-10-16 11:02:41 -0700552 }
553 return volthaDevices, nil
554 } else {
555 unpackResult := &ic.Error{}
556 var err error
557 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000558 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker2c1c4822019-10-16 11:02:41 -0700559 }
divyadesai5d8836b2020-08-18 07:40:59 +0000560 logger.Debugw(ctx, "GetChildDevices-return", log.Fields{"parent-device-id": parentDeviceId, "success": success, "error": err})
Matteo Scandolob45cf592020-01-21 16:10:56 -0800561
Neha Sharma94f16a92020-06-26 04:17:55 +0000562 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
Scott Baker2c1c4822019-10-16 11:02:41 -0700563 }
564}
565
566func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
divyadesai5d8836b2020-08-18 07:40:59 +0000567 logger.Debugw(ctx, "SendPacketIn", log.Fields{"device-id": deviceId, "port": port, "pktPayload": pktPayload})
Scott Baker2c1c4822019-10-16 11:02:41 -0700568 rpc := "PacketIn"
569 // Use a device specific topic to send the request. The adapter handling the device creates a device
570 // specific topic
571 toTopic := ap.getCoreTopic(deviceId)
572 replyToTopic := ap.getAdapterTopic()
573
574 args := make([]*kafka.KVArg, 3)
575 id := &voltha.ID{Id: deviceId}
576 args[0] = &kafka.KVArg{
577 Key: "device_id",
578 Value: id,
579 }
580 portNo := &ic.IntType{Val: int64(port)}
581 args[1] = &kafka.KVArg{
582 Key: "port",
583 Value: portNo,
584 }
585 pkt := &ic.Packet{Payload: pktPayload}
586 args[2] = &kafka.KVArg{
587 Key: "packet",
588 Value: pkt,
589 }
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000590 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000591 logger.Debugw(ctx, "SendPacketIn-response", log.Fields{"device-id": deviceId, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000592 return unPackResponse(ctx, rpc, deviceId, success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700593}
594
David Bainbridge36d0b202019-10-23 18:33:34 +0000595func (ap *CoreProxy) DeviceReasonUpdate(ctx context.Context, deviceId string, deviceReason string) error {
divyadesai5d8836b2020-08-18 07:40:59 +0000596 logger.Debugw(ctx, "DeviceReasonUpdate", log.Fields{"device-id": deviceId, "deviceReason": deviceReason})
David Bainbridge36d0b202019-10-23 18:33:34 +0000597 rpc := "DeviceReasonUpdate"
598 // Use a device specific topic to send the request. The adapter handling the device creates a device
599 // specific topic
600 toTopic := ap.getCoreTopic(deviceId)
601 replyToTopic := ap.getAdapterTopic()
602
603 args := make([]*kafka.KVArg, 2)
604 id := &voltha.ID{Id: deviceId}
605 args[0] = &kafka.KVArg{
606 Key: "device_id",
607 Value: id,
608 }
609 reason := &ic.StrType{Val: deviceReason}
610 args[1] = &kafka.KVArg{
611 Key: "device_reason",
612 Value: reason,
613 }
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000614 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000615 logger.Debugw(ctx, "DeviceReason-response", log.Fields{"device-id": deviceId, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000616 return unPackResponse(ctx, rpc, deviceId, success, result)
David Bainbridge36d0b202019-10-23 18:33:34 +0000617}
618
Scott Baker2c1c4822019-10-16 11:02:41 -0700619func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
Neha Sharma94f16a92020-06-26 04:17:55 +0000620 logger.Debugw(ctx, "DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
Scott Baker2c1c4822019-10-16 11:02:41 -0700621 rpc := "DevicePMConfigUpdate"
622 // Use a device specific topic to send the request. The adapter handling the device creates a device
623 // specific topic
624 toTopic := ap.getCoreTopic(pmConfigs.Id)
625 replyToTopic := ap.getAdapterTopic()
626
627 args := make([]*kafka.KVArg, 1)
628 args[0] = &kafka.KVArg{
629 Key: "device_pm_config",
630 Value: pmConfigs,
631 }
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000632 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000633 logger.Debugw(ctx, "DevicePMConfigUpdate-response", log.Fields{"pmconfig-device-id": pmConfigs.Id, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000634 return unPackResponse(ctx, rpc, pmConfigs.Id, success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700635}
636
637func (ap *CoreProxy) ReconcileChildDevices(ctx context.Context, parentDeviceId string) error {
divyadesai5d8836b2020-08-18 07:40:59 +0000638 logger.Debugw(ctx, "ReconcileChildDevices", log.Fields{"parent-device-id": parentDeviceId})
Scott Baker2c1c4822019-10-16 11:02:41 -0700639 rpc := "ReconcileChildDevices"
640 // Use a device specific topic to send the request. The adapter handling the device creates a device
641 // specific topic
642 toTopic := ap.getCoreTopic(parentDeviceId)
643 replyToTopic := ap.getAdapterTopic()
644
645 args := []*kafka.KVArg{
646 {Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
647 }
648
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000649 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000650 logger.Debugw(ctx, "ReconcileChildDevices-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000651 return unPackResponse(ctx, rpc, parentDeviceId, success, result)
Scott Baker2c1c4822019-10-16 11:02:41 -0700652}
Chaitrashree G S66d41352020-01-09 20:18:58 -0500653
654func (ap *CoreProxy) PortStateUpdate(ctx context.Context, deviceId string, pType voltha.Port_PortType, portNum uint32,
serkant.uluderyab38671c2019-11-01 09:35:38 -0700655 operStatus voltha.OperStatus_Types) error {
divyadesai5d8836b2020-08-18 07:40:59 +0000656 logger.Debugw(ctx, "PortStateUpdate", log.Fields{"device-id": deviceId, "portType": pType, "portNo": portNum, "operation_status": operStatus})
Chaitrashree G S66d41352020-01-09 20:18:58 -0500657 rpc := "PortStateUpdate"
658 // Use a device specific topic to send the request. The adapter handling the device creates a device
659 // specific topic
660 toTopic := ap.getCoreTopic(deviceId)
661 args := make([]*kafka.KVArg, 4)
662 deviceID := &voltha.ID{Id: deviceId}
663 portNo := &ic.IntType{Val: int64(portNum)}
664 portType := &ic.IntType{Val: int64(pType)}
665 oStatus := &ic.IntType{Val: int64(operStatus)}
666
667 args[0] = &kafka.KVArg{
668 Key: "device_id",
669 Value: deviceID,
670 }
671 args[1] = &kafka.KVArg{
672 Key: "oper_status",
673 Value: oStatus,
674 }
675 args[2] = &kafka.KVArg{
676 Key: "port_type",
677 Value: portType,
678 }
679 args[3] = &kafka.KVArg{
680 Key: "port_no",
681 Value: portNo,
682 }
683
684 // Use a device specific topic as we are the only adaptercore handling requests for this device
685 replyToTopic := ap.getAdapterTopic()
Rohan Agrawalfb1cb352020-08-03 04:59:32 +0000686 success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
divyadesai5d8836b2020-08-18 07:40:59 +0000687 logger.Debugw(ctx, "PortStateUpdate-response", log.Fields{"device-id": deviceId, "success": success})
Neha Sharma94f16a92020-06-26 04:17:55 +0000688 return unPackResponse(ctx, rpc, deviceId, success, result)
Chaitrashree G S66d41352020-01-09 20:18:58 -0500689}