blob: 28b532f4c911d178cc6a3a2ae18abe4afa9a014c [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
Chaitrashree G Sded0a832020-01-09 20:21:48 -050020 "sync"
21
William Kurkianea869482019-04-09 15:16:11 -040022 "github.com/golang/protobuf/ptypes"
23 a "github.com/golang/protobuf/ptypes/any"
Esin Karamanccb714b2019-11-29 15:02:06 +000024 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
25 "github.com/opencord/voltha-lib-go/v3/pkg/log"
26 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
27 "github.com/opencord/voltha-protos/v3/go/voltha"
William Kurkianea869482019-04-09 15:16:11 -040028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
William Kurkianea869482019-04-09 15:16:11 -040030)
31
32type CoreProxy struct {
npujarec5762e2020-01-01 14:08:48 +053033 kafkaICProxy kafka.InterContainerProxy
Matt Jeanneretcab955f2019-04-10 15:45:57 -040034 adapterTopic string
35 coreTopic string
36 deviceIdCoreMap map[string]string
William Kurkianea869482019-04-09 15:16:11 -040037 lockDeviceIdCoreMap sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040038}
39
Neha Sharma96b7bf22020-06-15 10:37:32 +000040func NewCoreProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
William Kurkianea869482019-04-09 15:16:11 -040041 var proxy CoreProxy
42 proxy.kafkaICProxy = kafkaProxy
43 proxy.adapterTopic = adapterTopic
44 proxy.coreTopic = coreTopic
45 proxy.deviceIdCoreMap = make(map[string]string)
46 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
Neha Sharma96b7bf22020-06-15 10:37:32 +000047 logger.Debugw(ctx, "TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
William Kurkianea869482019-04-09 15:16:11 -040048
49 return &proxy
50}
51
Neha Sharma96b7bf22020-06-15 10:37:32 +000052func unPackResponse(ctx context.Context, rpc string, deviceId string, success bool, response *a.Any) error {
William Kurkianea869482019-04-09 15:16:11 -040053 if success {
54 return nil
55 } else {
56 unpackResult := &ic.Error{}
57 var err error
58 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +000059 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
William Kurkianea869482019-04-09 15:16:11 -040060 }
Neha Sharma96b7bf22020-06-15 10:37:32 +000061 logger.Debugw(ctx, "response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
William Kurkianea869482019-04-09 15:16:11 -040062 // TODO: Need to get the real error code
63 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
64 }
65}
66
67// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
68func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
69 ap.lockDeviceIdCoreMap.Lock()
70 defer ap.lockDeviceIdCoreMap.Unlock()
71 ap.deviceIdCoreMap[deviceId] = coreReference
72}
73
74// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
75func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
76 ap.lockDeviceIdCoreMap.Lock()
77 defer ap.lockDeviceIdCoreMap.Unlock()
78 delete(ap.deviceIdCoreMap, deviceId)
79}
80
81func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
82 ap.lockDeviceIdCoreMap.Lock()
83 defer ap.lockDeviceIdCoreMap.Unlock()
84
85 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
86 return kafka.Topic{Name: t}
87 }
88
89 return kafka.Topic{Name: ap.coreTopic}
90}
91
92func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
93 return kafka.Topic{Name: ap.adapterTopic}
94}
95
96func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +000097 logger.Debugw(ctx, "registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
William Kurkianea869482019-04-09 15:16:11 -040098 rpc := "Register"
99 topic := kafka.Topic{Name: ap.coreTopic}
100 replyToTopic := ap.getAdapterTopic()
101 args := make([]*kafka.KVArg, 2)
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700102
103 if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000104 logger.Fatal(ctx, "totalReplicas can't be 0, since you're here you have at least one")
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700105 }
106
107 if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000108 logger.Fatal(ctx, "currentReplica can't be 0, it has to start from 1")
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700109 }
110
111 if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
112 // if the adapter is not setting these fields they default to 0,
113 // in that case it means the adapter is not ready to be scaled and thus it defaults
114 // to a single instance
115 adapter.CurrentReplica = 1
116 adapter.TotalReplicas = 1
117 }
118
119 if adapter.CurrentReplica > adapter.TotalReplicas {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000120 logger.Fatalf(ctx, "CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
Matteo Scandolo3ad5d2b2020-04-02 17:02:04 -0700121 adapter.CurrentReplica, adapter.TotalReplicas)
122 }
123
William Kurkianea869482019-04-09 15:16:11 -0400124 args[0] = &kafka.KVArg{
125 Key: "adapter",
126 Value: adapter,
127 }
128 args[1] = &kafka.KVArg{
129 Key: "deviceTypes",
130 Value: deviceTypes,
131 }
132
133 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000134 logger.Debugw(ctx, "Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
135 return unPackResponse(ctx, rpc, "", success, result)
William Kurkianea869482019-04-09 15:16:11 -0400136}
137
138func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000139 logger.Debugw(ctx, "DeviceUpdate", log.Fields{"deviceId": device.Id})
William Kurkianea869482019-04-09 15:16:11 -0400140 rpc := "DeviceUpdate"
141 toTopic := ap.getCoreTopic(device.Id)
142 args := make([]*kafka.KVArg, 1)
143 args[0] = &kafka.KVArg{
144 Key: "device",
145 Value: device,
146 }
147 // Use a device specific topic as we are the only adaptercore handling requests for this device
148 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000149 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000150 logger.Debugw(ctx, "DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
151 return unPackResponse(ctx, rpc, device.Id, success, result)
William Kurkianea869482019-04-09 15:16:11 -0400152}
153
154func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000155 logger.Debugw(ctx, "PortCreated", log.Fields{"portNo": port.PortNo})
William Kurkianea869482019-04-09 15:16:11 -0400156 rpc := "PortCreated"
157 // Use a device specific topic to send the request. The adapter handling the device creates a device
158 // specific topic
159 toTopic := ap.getCoreTopic(deviceId)
160 args := make([]*kafka.KVArg, 2)
161 id := &voltha.ID{Id: deviceId}
162 args[0] = &kafka.KVArg{
163 Key: "device_id",
164 Value: id,
165 }
166 args[1] = &kafka.KVArg{
167 Key: "port",
168 Value: port,
169 }
170
171 // Use a device specific topic as we are the only adaptercore handling requests for this device
172 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000173 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000174 logger.Debugw(ctx, "PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
175 return unPackResponse(ctx, rpc, deviceId, success, result)
William Kurkianea869482019-04-09 15:16:11 -0400176}
177
Esin Karamanccb714b2019-11-29 15:02:06 +0000178func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_Types) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000179 logger.Debugw(ctx, "PortsStateUpdate", log.Fields{"deviceId": deviceId})
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400180 rpc := "PortsStateUpdate"
181 // Use a device specific topic to send the request. The adapter handling the device creates a device
182 // specific topic
183 toTopic := ap.getCoreTopic(deviceId)
184 args := make([]*kafka.KVArg, 2)
185 id := &voltha.ID{Id: deviceId}
186 oStatus := &ic.IntType{Val: int64(operStatus)}
187
188 args[0] = &kafka.KVArg{
189 Key: "device_id",
190 Value: id,
191 }
192 args[1] = &kafka.KVArg{
193 Key: "oper_status",
194 Value: oStatus,
195 }
196
197 // Use a device specific topic as we are the only adaptercore handling requests for this device
198 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000199 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000200 logger.Debugw(ctx, "PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
201 return unPackResponse(ctx, rpc, deviceId, success, result)
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400202}
203
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400204func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000205 logger.Debugw(ctx, "DeleteAllPorts", log.Fields{"deviceId": deviceId})
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400206 rpc := "DeleteAllPorts"
207 // Use a device specific topic to send the request. The adapter handling the device creates a device
208 // specific topic
209 toTopic := ap.getCoreTopic(deviceId)
210 args := make([]*kafka.KVArg, 2)
211 id := &voltha.ID{Id: deviceId}
212
213 args[0] = &kafka.KVArg{
214 Key: "device_id",
215 Value: id,
216 }
217
218 // Use a device specific topic as we are the only adaptercore handling requests for this device
219 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000220 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000221 logger.Debugw(ctx, "DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
222 return unPackResponse(ctx, rpc, deviceId, success, result)
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400223}
224
William Kurkianea869482019-04-09 15:16:11 -0400225func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
Esin Karamanccb714b2019-11-29 15:02:06 +0000226 connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000227 logger.Debugw(ctx, "DeviceStateUpdate", log.Fields{"deviceId": deviceId})
William Kurkianea869482019-04-09 15:16:11 -0400228 rpc := "DeviceStateUpdate"
229 // Use a device specific topic to send the request. The adapter handling the device creates a device
230 // specific topic
231 toTopic := ap.getCoreTopic(deviceId)
232 args := make([]*kafka.KVArg, 3)
233 id := &voltha.ID{Id: deviceId}
234 oStatus := &ic.IntType{Val: int64(operStatus)}
235 cStatus := &ic.IntType{Val: int64(connStatus)}
236
237 args[0] = &kafka.KVArg{
238 Key: "device_id",
239 Value: id,
240 }
241 args[1] = &kafka.KVArg{
242 Key: "oper_status",
243 Value: oStatus,
244 }
245 args[2] = &kafka.KVArg{
246 Key: "connect_status",
247 Value: cStatus,
248 }
249 // Use a device specific topic as we are the only adaptercore handling requests for this device
250 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000251 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000252 logger.Debugw(ctx, "DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
253 return unPackResponse(ctx, rpc, deviceId, success, result)
William Kurkianea869482019-04-09 15:16:11 -0400254}
255
256func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
Mahir Gunyele77977b2019-06-27 05:36:22 -0700257 childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000258 logger.Debugw(ctx, "ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
William Kurkianea869482019-04-09 15:16:11 -0400259 rpc := "ChildDeviceDetected"
260 // Use a device specific topic to send the request. The adapter handling the device creates a device
261 // specific topic
262 toTopic := ap.getCoreTopic(parentDeviceId)
263 replyToTopic := ap.getAdapterTopic()
264
265 args := make([]*kafka.KVArg, 7)
266 id := &voltha.ID{Id: parentDeviceId}
267 args[0] = &kafka.KVArg{
268 Key: "parent_device_id",
269 Value: id,
270 }
271 ppn := &ic.IntType{Val: int64(parentPortNo)}
272 args[1] = &kafka.KVArg{
273 Key: "parent_port_no",
274 Value: ppn,
275 }
276 cdt := &ic.StrType{Val: childDeviceType}
277 args[2] = &kafka.KVArg{
278 Key: "child_device_type",
279 Value: cdt,
280 }
281 channel := &ic.IntType{Val: int64(channelId)}
282 args[3] = &kafka.KVArg{
283 Key: "channel_id",
284 Value: channel,
285 }
286 vId := &ic.StrType{Val: vendorId}
287 args[4] = &kafka.KVArg{
288 Key: "vendor_id",
289 Value: vId,
290 }
291 sNo := &ic.StrType{Val: serialNumber}
292 args[5] = &kafka.KVArg{
293 Key: "serial_number",
294 Value: sNo,
295 }
296 oId := &ic.IntType{Val: int64(onuId)}
297 args[6] = &kafka.KVArg{
298 Key: "onu_id",
299 Value: oId,
300 }
301
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000302 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000303 logger.Debugw(ctx, "ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
Mahir Gunyele77977b2019-06-27 05:36:22 -0700304
305 if success {
306 volthaDevice := &voltha.Device{}
307 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000308 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Matteo Scandolo945e4012019-12-12 14:16:11 -0800309 return nil, status.Error(codes.InvalidArgument, err.Error())
Mahir Gunyele77977b2019-06-27 05:36:22 -0700310 }
311 return volthaDevice, nil
312 } else {
313 unpackResult := &ic.Error{}
314 var err error
315 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000316 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Mahir Gunyele77977b2019-06-27 05:36:22 -0700317 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000318 logger.Debugw(ctx, "ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
Matteo Scandolo945e4012019-12-12 14:16:11 -0800319
Neha Sharma96b7bf22020-06-15 10:37:32 +0000320 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
Mahir Gunyele77977b2019-06-27 05:36:22 -0700321 }
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400322
323}
324
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400325func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000326 logger.Debugw(ctx, "ChildDevicesLost", log.Fields{"pDeviceId": parentDeviceId})
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400327 rpc := "ChildDevicesLost"
328 // Use a device specific topic to send the request. The adapter handling the device creates a device
329 // specific topic
330 toTopic := ap.getCoreTopic(parentDeviceId)
331 replyToTopic := ap.getAdapterTopic()
332
333 args := make([]*kafka.KVArg, 1)
334 id := &voltha.ID{Id: parentDeviceId}
335 args[0] = &kafka.KVArg{
336 Key: "parent_device_id",
337 Value: id,
338 }
339
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000340 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000341 logger.Debugw(ctx, "ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
342 return unPackResponse(ctx, rpc, parentDeviceId, success, result)
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400343}
344
345func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000346 logger.Debugw(ctx, "ChildDevicesDetected", log.Fields{"pDeviceId": parentDeviceId})
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400347 rpc := "ChildDevicesDetected"
348 // Use a device specific topic to send the request. The adapter handling the device creates a device
349 // specific topic
350 toTopic := ap.getCoreTopic(parentDeviceId)
351 replyToTopic := ap.getAdapterTopic()
352
353 args := make([]*kafka.KVArg, 1)
354 id := &voltha.ID{Id: parentDeviceId}
355 args[0] = &kafka.KVArg{
356 Key: "parent_device_id",
357 Value: id,
358 }
359
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000360 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000361 logger.Debugw(ctx, "ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
362 return unPackResponse(ctx, rpc, parentDeviceId, success, result)
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400363}
364
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400365func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000366 logger.Debugw(ctx, "GetDevice", log.Fields{"deviceId": deviceId})
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400367 rpc := "GetDevice"
368
369 toTopic := ap.getCoreTopic(parentDeviceId)
370 replyToTopic := ap.getAdapterTopic()
371
372 args := make([]*kafka.KVArg, 1)
373 id := &voltha.ID{Id: deviceId}
374 args[0] = &kafka.KVArg{
375 Key: "device_id",
376 Value: id,
377 }
378
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000379 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000380 logger.Debugw(ctx, "GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400381
382 if success {
383 volthaDevice := &voltha.Device{}
384 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000385 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Matteo Scandolo945e4012019-12-12 14:16:11 -0800386 return nil, status.Error(codes.InvalidArgument, err.Error())
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400387 }
388 return volthaDevice, nil
389 } else {
390 unpackResult := &ic.Error{}
391 var err error
392 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000393 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400394 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000395 logger.Debugw(ctx, "GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400396 // TODO: Need to get the real error code
Neha Sharma96b7bf22020-06-15 10:37:32 +0000397 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400398 }
399}
400
401func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000402 logger.Debugw(ctx, "GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400403 rpc := "GetChildDevice"
404
405 toTopic := ap.getCoreTopic(parentDeviceId)
406 replyToTopic := ap.getAdapterTopic()
407
408 args := make([]*kafka.KVArg, 4)
409 id := &voltha.ID{Id: parentDeviceId}
410 args[0] = &kafka.KVArg{
411 Key: "device_id",
412 Value: id,
413 }
414
415 var cnt uint8 = 0
416 for k, v := range kwargs {
417 cnt += 1
418 if k == "serial_number" {
419 val := &ic.StrType{Val: v.(string)}
420 args[cnt] = &kafka.KVArg{
421 Key: k,
422 Value: val,
423 }
424 } else if k == "onu_id" {
425 val := &ic.IntType{Val: int64(v.(uint32))}
426 args[cnt] = &kafka.KVArg{
427 Key: k,
428 Value: val,
429 }
430 } else if k == "parent_port_no" {
431 val := &ic.IntType{Val: int64(v.(uint32))}
432 args[cnt] = &kafka.KVArg{
433 Key: k,
434 Value: val,
435 }
436 }
437 }
438
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000439 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000440 logger.Debugw(ctx, "GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400441
442 if success {
443 volthaDevice := &voltha.Device{}
444 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000445 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Matteo Scandolo945e4012019-12-12 14:16:11 -0800446 return nil, status.Error(codes.InvalidArgument, err.Error())
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400447 }
448 return volthaDevice, nil
449 } else {
450 unpackResult := &ic.Error{}
451 var err error
452 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000453 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400454 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000455 logger.Debugw(ctx, "GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
npujarec5762e2020-01-01 14:08:48 +0530456
Neha Sharma96b7bf22020-06-15 10:37:32 +0000457 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400458 }
William Kurkianea869482019-04-09 15:16:11 -0400459}
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400460
Chaitrashree G Sbe6ab942019-05-24 06:42:49 -0400461func (ap *CoreProxy) GetChildDevices(ctx context.Context, parentDeviceId string) (*voltha.Devices, error) {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000462 logger.Debugw(ctx, "GetChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
Chaitrashree G Sbe6ab942019-05-24 06:42:49 -0400463 rpc := "GetChildDevices"
464
465 toTopic := ap.getCoreTopic(parentDeviceId)
466 replyToTopic := ap.getAdapterTopic()
467
468 args := make([]*kafka.KVArg, 1)
469 id := &voltha.ID{Id: parentDeviceId}
470 args[0] = &kafka.KVArg{
471 Key: "device_id",
472 Value: id,
473 }
474
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000475 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000476 logger.Debugw(ctx, "GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
Chaitrashree G Sbe6ab942019-05-24 06:42:49 -0400477
478 if success {
479 volthaDevices := &voltha.Devices{}
480 if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000481 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Matteo Scandolo945e4012019-12-12 14:16:11 -0800482 return nil, status.Error(codes.InvalidArgument, err.Error())
Chaitrashree G Sbe6ab942019-05-24 06:42:49 -0400483 }
484 return volthaDevices, nil
485 } else {
486 unpackResult := &ic.Error{}
487 var err error
488 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000489 logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
Chaitrashree G Sbe6ab942019-05-24 06:42:49 -0400490 }
Neha Sharma96b7bf22020-06-15 10:37:32 +0000491 logger.Debugw(ctx, "GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
npujarec5762e2020-01-01 14:08:48 +0530492
Neha Sharma96b7bf22020-06-15 10:37:32 +0000493 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
Chaitrashree G Sbe6ab942019-05-24 06:42:49 -0400494 }
495}
496
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400497func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000498 logger.Debugw(ctx, "SendPacketIn", log.Fields{"deviceId": deviceId, "port": port, "pktPayload": pktPayload})
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400499 rpc := "PacketIn"
500 // Use a device specific topic to send the request. The adapter handling the device creates a device
501 // specific topic
502 toTopic := ap.getCoreTopic(deviceId)
503 replyToTopic := ap.getAdapterTopic()
504
505 args := make([]*kafka.KVArg, 3)
506 id := &voltha.ID{Id: deviceId}
507 args[0] = &kafka.KVArg{
508 Key: "device_id",
509 Value: id,
510 }
511 portNo := &ic.IntType{Val: int64(port)}
512 args[1] = &kafka.KVArg{
513 Key: "port",
514 Value: portNo,
515 }
516 pkt := &ic.Packet{Payload: pktPayload}
517 args[2] = &kafka.KVArg{
518 Key: "packet",
519 Value: pkt,
520 }
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000521 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000522 logger.Debugw(ctx, "SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
523 return unPackResponse(ctx, rpc, deviceId, success, result)
manikkaraj k9eb6cac2019-05-09 12:32:03 -0400524}
Manikkaraj kb1d51442019-07-23 10:41:02 -0400525
David Bainbridgebe7cac12019-10-23 19:53:07 +0000526func (ap *CoreProxy) DeviceReasonUpdate(ctx context.Context, deviceId string, deviceReason string) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000527 logger.Debugw(ctx, "DeviceReasonUpdate", log.Fields{"deviceId": deviceId, "deviceReason": deviceReason})
David Bainbridgebe7cac12019-10-23 19:53:07 +0000528 rpc := "DeviceReasonUpdate"
529 // Use a device specific topic to send the request. The adapter handling the device creates a device
530 // specific topic
531 toTopic := ap.getCoreTopic(deviceId)
532 replyToTopic := ap.getAdapterTopic()
533
534 args := make([]*kafka.KVArg, 2)
535 id := &voltha.ID{Id: deviceId}
536 args[0] = &kafka.KVArg{
537 Key: "device_id",
538 Value: id,
539 }
540 reason := &ic.StrType{Val: deviceReason}
541 args[1] = &kafka.KVArg{
542 Key: "device_reason",
543 Value: reason,
544 }
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000545 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000546 logger.Debugw(ctx, "DeviceReason-response", log.Fields{"pDeviceId": deviceId, "success": success})
547 return unPackResponse(ctx, rpc, deviceId, success, result)
David Bainbridgebe7cac12019-10-23 19:53:07 +0000548}
549
Manikkaraj kb1d51442019-07-23 10:41:02 -0400550func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000551 logger.Debugw(ctx, "DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
Manikkaraj kb1d51442019-07-23 10:41:02 -0400552 rpc := "DevicePMConfigUpdate"
553 // Use a device specific topic to send the request. The adapter handling the device creates a device
554 // specific topic
555 toTopic := ap.getCoreTopic(pmConfigs.Id)
556 replyToTopic := ap.getAdapterTopic()
557
558 args := make([]*kafka.KVArg, 1)
559 args[0] = &kafka.KVArg{
560 Key: "device_pm_config",
561 Value: pmConfigs,
562 }
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000563 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000564 logger.Debugw(ctx, "DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
565 return unPackResponse(ctx, rpc, pmConfigs.Id, success, result)
Manikkaraj kb1d51442019-07-23 10:41:02 -0400566}
567
568func (ap *CoreProxy) ReconcileChildDevices(ctx context.Context, parentDeviceId string) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000569 logger.Debugw(ctx, "ReconcileChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
Manikkaraj kb1d51442019-07-23 10:41:02 -0400570 rpc := "ReconcileChildDevices"
571 // Use a device specific topic to send the request. The adapter handling the device creates a device
572 // specific topic
573 toTopic := ap.getCoreTopic(parentDeviceId)
574 replyToTopic := ap.getAdapterTopic()
575
576 args := []*kafka.KVArg{
577 {Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
578 }
579
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000580 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000581 logger.Debugw(ctx, "ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
582 return unPackResponse(ctx, rpc, parentDeviceId, success, result)
Manikkaraj kb1d51442019-07-23 10:41:02 -0400583}
Chaitrashree G Sded0a832020-01-09 20:21:48 -0500584
585func (ap *CoreProxy) PortStateUpdate(ctx context.Context, deviceId string, pType voltha.Port_PortType, portNum uint32,
Esin Karamanccb714b2019-11-29 15:02:06 +0000586 operStatus voltha.OperStatus_Types) error {
Neha Sharma96b7bf22020-06-15 10:37:32 +0000587 logger.Debugw(ctx, "PortStateUpdate", log.Fields{"deviceId": deviceId, "portType": pType, "portNo": portNum, "operation_status": operStatus})
Chaitrashree G Sded0a832020-01-09 20:21:48 -0500588 rpc := "PortStateUpdate"
589 // Use a device specific topic to send the request. The adapter handling the device creates a device
590 // specific topic
591 toTopic := ap.getCoreTopic(deviceId)
592 args := make([]*kafka.KVArg, 4)
593 deviceID := &voltha.ID{Id: deviceId}
594 portNo := &ic.IntType{Val: int64(portNum)}
595 portType := &ic.IntType{Val: int64(pType)}
596 oStatus := &ic.IntType{Val: int64(operStatus)}
597
598 args[0] = &kafka.KVArg{
599 Key: "device_id",
600 Value: deviceID,
601 }
602 args[1] = &kafka.KVArg{
603 Key: "oper_status",
604 Value: oStatus,
605 }
606 args[2] = &kafka.KVArg{
607 Key: "port_type",
608 Value: portType,
609 }
610 args[3] = &kafka.KVArg{
611 Key: "port_no",
612 Value: portNo,
613 }
614
615 // Use a device specific topic as we are the only adaptercore handling requests for this device
616 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal02f784d2020-02-14 09:34:02 +0000617 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
Neha Sharma96b7bf22020-06-15 10:37:32 +0000618 logger.Debugw(ctx, "PortStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
619 return unPackResponse(ctx, rpc, deviceId, success, result)
Chaitrashree G Sded0a832020-01-09 20:21:48 -0500620}