blob: 86f186d270200b8b1b98a09f0e61d1e30e9d49eb [file] [log] [blame]
khenaidood2b6df92018-12-13 16:37:20 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080020 "sync"
21
khenaidood2b6df92018-12-13 16:37:20 -050022 "github.com/golang/protobuf/ptypes"
23 a "github.com/golang/protobuf/ptypes/any"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080024 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
25 "github.com/opencord/voltha-lib-go/v3/pkg/log"
26 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
27 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidood2b6df92018-12-13 16:37:20 -050028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30)
31
32type CoreProxy struct {
npujar467fe752020-01-16 20:17:45 +053033 kafkaICProxy kafka.InterContainerProxy
cuilin20186b6a9952019-04-03 22:37:11 -070034 adapterTopic string
35 coreTopic string
36 deviceIdCoreMap map[string]string
khenaidoo54e0ddf2019-02-27 16:21:33 -050037 lockDeviceIdCoreMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050038}
39
npujar467fe752020-01-16 20:17:45 +053040func NewCoreProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
khenaidood2b6df92018-12-13 16:37:20 -050041 var proxy CoreProxy
42 proxy.kafkaICProxy = kafkaProxy
43 proxy.adapterTopic = adapterTopic
44 proxy.coreTopic = coreTopic
khenaidoo54e0ddf2019-02-27 16:21:33 -050045 proxy.deviceIdCoreMap = make(map[string]string)
46 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
serkant.uluderya2ae470f2020-01-21 11:13:09 -080047 logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
khenaidood2b6df92018-12-13 16:37:20 -050048
49 return &proxy
50}
51
52func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
53 if success {
54 return nil
55 } else {
56 unpackResult := &ic.Error{}
57 var err error
58 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080059 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
khenaidood2b6df92018-12-13 16:37:20 -050060 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -080061 logger.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
khenaidood2b6df92018-12-13 16:37:20 -050062 // TODO: Need to get the real error code
63 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
64 }
65}
66
khenaidoo54e0ddf2019-02-27 16:21:33 -050067// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
68func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
69 ap.lockDeviceIdCoreMap.Lock()
70 defer ap.lockDeviceIdCoreMap.Unlock()
71 ap.deviceIdCoreMap[deviceId] = coreReference
72}
73
74// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
75func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
76 ap.lockDeviceIdCoreMap.Lock()
77 defer ap.lockDeviceIdCoreMap.Unlock()
78 delete(ap.deviceIdCoreMap, deviceId)
79}
80
81func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
82 ap.lockDeviceIdCoreMap.Lock()
83 defer ap.lockDeviceIdCoreMap.Unlock()
84
85 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
86 return kafka.Topic{Name: t}
87 }
88
89 return kafka.Topic{Name: ap.coreTopic}
90}
91
92func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
93 return kafka.Topic{Name: ap.adapterTopic}
94}
95
khenaidood2b6df92018-12-13 16:37:20 -050096func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -080097 logger.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
khenaidood2b6df92018-12-13 16:37:20 -050098 rpc := "Register"
99 topic := kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -0500100 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500101 args := make([]*kafka.KVArg, 2)
102 args[0] = &kafka.KVArg{
103 Key: "adapter",
104 Value: adapter,
105 }
106 args[1] = &kafka.KVArg{
107 Key: "deviceTypes",
108 Value: deviceTypes,
109 }
110
khenaidoobdcb8e02019-03-06 16:28:56 -0500111 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800112 logger.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
khenaidood2b6df92018-12-13 16:37:20 -0500113 return unPackResponse(rpc, "", success, result)
114}
115
116func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800117 logger.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
khenaidood2b6df92018-12-13 16:37:20 -0500118 rpc := "DeviceUpdate"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500119 toTopic := ap.getCoreTopic(device.Id)
khenaidood2b6df92018-12-13 16:37:20 -0500120 args := make([]*kafka.KVArg, 1)
121 args[0] = &kafka.KVArg{
122 Key: "device",
123 Value: device,
124 }
125 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500126 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000127 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800128 logger.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
khenaidood2b6df92018-12-13 16:37:20 -0500129 return unPackResponse(rpc, device.Id, success, result)
130}
131
132func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800133 logger.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
khenaidood2b6df92018-12-13 16:37:20 -0500134 rpc := "PortCreated"
135 // Use a device specific topic to send the request. The adapter handling the device creates a device
136 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500137 toTopic := ap.getCoreTopic(deviceId)
khenaidood2b6df92018-12-13 16:37:20 -0500138 args := make([]*kafka.KVArg, 2)
139 id := &voltha.ID{Id: deviceId}
140 args[0] = &kafka.KVArg{
141 Key: "device_id",
142 Value: id,
143 }
144 args[1] = &kafka.KVArg{
145 Key: "port",
146 Value: port,
147 }
148
149 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500150 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000151 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800152 logger.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
khenaidood2b6df92018-12-13 16:37:20 -0500153 return unPackResponse(rpc, deviceId, success, result)
154}
155
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800156func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_Types) error {
khenaidoo3ab34882019-05-02 21:33:30 -0400157 log.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
158 rpc := "PortsStateUpdate"
159 // Use a device specific topic to send the request. The adapter handling the device creates a device
160 // specific topic
161 toTopic := ap.getCoreTopic(deviceId)
162 args := make([]*kafka.KVArg, 2)
163 id := &voltha.ID{Id: deviceId}
164 oStatus := &ic.IntType{Val: int64(operStatus)}
165
166 args[0] = &kafka.KVArg{
167 Key: "device_id",
168 Value: id,
169 }
170 args[1] = &kafka.KVArg{
171 Key: "oper_status",
172 Value: oStatus,
173 }
174
175 // Use a device specific topic as we are the only adaptercore handling requests for this device
176 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000177 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800178 logger.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
khenaidoo3ab34882019-05-02 21:33:30 -0400179 return unPackResponse(rpc, deviceId, success, result)
180}
181
khenaidoo0a822f92019-05-08 15:15:57 -0400182func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800183 logger.Debugw("DeleteAllPorts", log.Fields{"deviceId": deviceId})
khenaidoo0a822f92019-05-08 15:15:57 -0400184 rpc := "DeleteAllPorts"
185 // Use a device specific topic to send the request. The adapter handling the device creates a device
186 // specific topic
187 toTopic := ap.getCoreTopic(deviceId)
188 args := make([]*kafka.KVArg, 2)
189 id := &voltha.ID{Id: deviceId}
190
191 args[0] = &kafka.KVArg{
192 Key: "device_id",
193 Value: id,
194 }
195
196 // Use a device specific topic as we are the only adaptercore handling requests for this device
197 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000198 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800199 logger.Debugw("DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
khenaidoo0a822f92019-05-08 15:15:57 -0400200 return unPackResponse(rpc, deviceId, success, result)
201}
202
khenaidood2b6df92018-12-13 16:37:20 -0500203func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800204 connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error {
khenaidood2b6df92018-12-13 16:37:20 -0500205 log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
206 rpc := "DeviceStateUpdate"
207 // Use a device specific topic to send the request. The adapter handling the device creates a device
208 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500209 toTopic := ap.getCoreTopic(deviceId)
khenaidood2b6df92018-12-13 16:37:20 -0500210 args := make([]*kafka.KVArg, 3)
211 id := &voltha.ID{Id: deviceId}
212 oStatus := &ic.IntType{Val: int64(operStatus)}
213 cStatus := &ic.IntType{Val: int64(connStatus)}
214
215 args[0] = &kafka.KVArg{
216 Key: "device_id",
217 Value: id,
218 }
219 args[1] = &kafka.KVArg{
220 Key: "oper_status",
221 Value: oStatus,
222 }
223 args[2] = &kafka.KVArg{
224 Key: "connect_status",
225 Value: cStatus,
226 }
227 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500228 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000229 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800230 logger.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
khenaidood2b6df92018-12-13 16:37:20 -0500231 return unPackResponse(rpc, deviceId, success, result)
232}
233
234func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
Mahir Gunyel6deaa242019-06-27 04:53:33 -0700235 childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800236 logger.Debugw("ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
khenaidood2b6df92018-12-13 16:37:20 -0500237 rpc := "ChildDeviceDetected"
238 // Use a device specific topic to send the request. The adapter handling the device creates a device
239 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500240 toTopic := ap.getCoreTopic(parentDeviceId)
241 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500242
khenaidoobdcb8e02019-03-06 16:28:56 -0500243 args := make([]*kafka.KVArg, 7)
khenaidood2b6df92018-12-13 16:37:20 -0500244 id := &voltha.ID{Id: parentDeviceId}
245 args[0] = &kafka.KVArg{
246 Key: "parent_device_id",
247 Value: id,
248 }
249 ppn := &ic.IntType{Val: int64(parentPortNo)}
250 args[1] = &kafka.KVArg{
251 Key: "parent_port_no",
252 Value: ppn,
253 }
254 cdt := &ic.StrType{Val: childDeviceType}
255 args[2] = &kafka.KVArg{
256 Key: "child_device_type",
257 Value: cdt,
258 }
259 channel := &ic.IntType{Val: int64(channelId)}
260 args[3] = &kafka.KVArg{
261 Key: "channel_id",
262 Value: channel,
263 }
khenaidoobdcb8e02019-03-06 16:28:56 -0500264 vId := &ic.StrType{Val: vendorId}
265 args[4] = &kafka.KVArg{
266 Key: "vendor_id",
267 Value: vId,
268 }
269 sNo := &ic.StrType{Val: serialNumber}
270 args[5] = &kafka.KVArg{
271 Key: "serial_number",
272 Value: sNo,
273 }
274 oId := &ic.IntType{Val: int64(onuId)}
275 args[6] = &kafka.KVArg{
276 Key: "onu_id",
277 Value: oId,
278 }
khenaidood2b6df92018-12-13 16:37:20 -0500279
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000280 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800281 logger.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
Mahir Gunyel6deaa242019-06-27 04:53:33 -0700282
283 if success {
284 volthaDevice := &voltha.Device{}
285 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800286 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800287 return nil, status.Error(codes.InvalidArgument, err.Error())
Mahir Gunyel6deaa242019-06-27 04:53:33 -0700288 }
289 return volthaDevice, nil
290 } else {
291 unpackResult := &ic.Error{}
292 var err error
293 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800294 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Mahir Gunyel6deaa242019-06-27 04:53:33 -0700295 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800296 logger.Debugw("ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800297
298 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
Mahir Gunyel6deaa242019-06-27 04:53:33 -0700299 }
cuilin20186b6a9952019-04-03 22:37:11 -0700300
301}
302
khenaidoo0a822f92019-05-08 15:15:57 -0400303func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800304 logger.Debugw("ChildDevicesLost", log.Fields{"pDeviceId": parentDeviceId})
khenaidoo0a822f92019-05-08 15:15:57 -0400305 rpc := "ChildDevicesLost"
306 // Use a device specific topic to send the request. The adapter handling the device creates a device
307 // specific topic
308 toTopic := ap.getCoreTopic(parentDeviceId)
309 replyToTopic := ap.getAdapterTopic()
310
311 args := make([]*kafka.KVArg, 1)
312 id := &voltha.ID{Id: parentDeviceId}
313 args[0] = &kafka.KVArg{
314 Key: "parent_device_id",
315 Value: id,
316 }
317
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000318 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800319 logger.Debugw("ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
khenaidoo0a822f92019-05-08 15:15:57 -0400320 return unPackResponse(rpc, parentDeviceId, success, result)
321}
322
323func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800324 logger.Debugw("ChildDevicesDetected", log.Fields{"pDeviceId": parentDeviceId})
khenaidoo0a822f92019-05-08 15:15:57 -0400325 rpc := "ChildDevicesDetected"
326 // Use a device specific topic to send the request. The adapter handling the device creates a device
327 // specific topic
328 toTopic := ap.getCoreTopic(parentDeviceId)
329 replyToTopic := ap.getAdapterTopic()
330
331 args := make([]*kafka.KVArg, 1)
332 id := &voltha.ID{Id: parentDeviceId}
333 args[0] = &kafka.KVArg{
334 Key: "parent_device_id",
335 Value: id,
336 }
337
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000338 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800339 logger.Debugw("ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
khenaidoo0a822f92019-05-08 15:15:57 -0400340 return unPackResponse(rpc, parentDeviceId, success, result)
341}
342
cuilin20186b6a9952019-04-03 22:37:11 -0700343func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800344 logger.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
cuilin20186b6a9952019-04-03 22:37:11 -0700345 rpc := "GetDevice"
346
347 toTopic := ap.getCoreTopic(parentDeviceId)
348 replyToTopic := ap.getAdapterTopic()
349
350 args := make([]*kafka.KVArg, 1)
351 id := &voltha.ID{Id: deviceId}
352 args[0] = &kafka.KVArg{
353 Key: "device_id",
354 Value: id,
355 }
356
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000357 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800358 logger.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
cuilin20186b6a9952019-04-03 22:37:11 -0700359
360 if success {
361 volthaDevice := &voltha.Device{}
362 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800363 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800364 return nil, status.Error(codes.InvalidArgument, err.Error())
cuilin20186b6a9952019-04-03 22:37:11 -0700365 }
366 return volthaDevice, nil
367 } else {
368 unpackResult := &ic.Error{}
369 var err error
370 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800371 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
cuilin20186b6a9952019-04-03 22:37:11 -0700372 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800373 logger.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
cuilin20186b6a9952019-04-03 22:37:11 -0700374 // TODO: Need to get the real error code
Scott Baker0e78ba22020-02-24 17:58:47 -0800375 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
cuilin20186b6a9952019-04-03 22:37:11 -0700376 }
377}
378
379func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800380 logger.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
cuilin20186b6a9952019-04-03 22:37:11 -0700381 rpc := "GetChildDevice"
382
383 toTopic := ap.getCoreTopic(parentDeviceId)
384 replyToTopic := ap.getAdapterTopic()
385
386 args := make([]*kafka.KVArg, 4)
387 id := &voltha.ID{Id: parentDeviceId}
388 args[0] = &kafka.KVArg{
389 Key: "device_id",
390 Value: id,
391 }
392
393 var cnt uint8 = 0
394 for k, v := range kwargs {
395 cnt += 1
396 if k == "serial_number" {
397 val := &ic.StrType{Val: v.(string)}
398 args[cnt] = &kafka.KVArg{
399 Key: k,
400 Value: val,
401 }
402 } else if k == "onu_id" {
403 val := &ic.IntType{Val: int64(v.(uint32))}
404 args[cnt] = &kafka.KVArg{
405 Key: k,
406 Value: val,
407 }
408 } else if k == "parent_port_no" {
409 val := &ic.IntType{Val: int64(v.(uint32))}
410 args[cnt] = &kafka.KVArg{
411 Key: k,
412 Value: val,
413 }
414 }
415 }
416
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000417 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800418 logger.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
cuilin20186b6a9952019-04-03 22:37:11 -0700419
420 if success {
421 volthaDevice := &voltha.Device{}
422 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800423 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800424 return nil, status.Error(codes.InvalidArgument, err.Error())
cuilin20186b6a9952019-04-03 22:37:11 -0700425 }
426 return volthaDevice, nil
427 } else {
428 unpackResult := &ic.Error{}
429 var err error
430 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800431 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
cuilin20186b6a9952019-04-03 22:37:11 -0700432 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800433 logger.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
npujar467fe752020-01-16 20:17:45 +0530434
Scott Baker0e78ba22020-02-24 17:58:47 -0800435 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
cuilin20186b6a9952019-04-03 22:37:11 -0700436 }
khenaidood2b6df92018-12-13 16:37:20 -0500437}
manikkaraj k6c9689d2019-05-09 12:59:52 -0400438
Chaitrashree G S89176ab2019-05-24 06:31:46 -0400439func (ap *CoreProxy) GetChildDevices(ctx context.Context, parentDeviceId string) (*voltha.Devices, error) {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800440 logger.Debugw("GetChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
Chaitrashree G S89176ab2019-05-24 06:31:46 -0400441 rpc := "GetChildDevices"
442
443 toTopic := ap.getCoreTopic(parentDeviceId)
444 replyToTopic := ap.getAdapterTopic()
445
446 args := make([]*kafka.KVArg, 1)
447 id := &voltha.ID{Id: parentDeviceId}
448 args[0] = &kafka.KVArg{
449 Key: "device_id",
450 Value: id,
451 }
452
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000453 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800454 logger.Debugw("GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
Chaitrashree G S89176ab2019-05-24 06:31:46 -0400455
456 if success {
457 volthaDevices := &voltha.Devices{}
458 if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800459 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Scott Baker0e78ba22020-02-24 17:58:47 -0800460 return nil, status.Error(codes.InvalidArgument, err.Error())
Chaitrashree G S89176ab2019-05-24 06:31:46 -0400461 }
462 return volthaDevices, nil
463 } else {
464 unpackResult := &ic.Error{}
465 var err error
466 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800467 logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
Chaitrashree G S89176ab2019-05-24 06:31:46 -0400468 }
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800469 logger.Debugw("GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
npujar467fe752020-01-16 20:17:45 +0530470
Scott Baker0e78ba22020-02-24 17:58:47 -0800471 return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
Chaitrashree G S89176ab2019-05-24 06:31:46 -0400472 }
473}
474
manikkaraj k6c9689d2019-05-09 12:59:52 -0400475func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800476 logger.Debugw("SendPacketIn", log.Fields{"deviceId": deviceId, "port": port, "pktPayload": pktPayload})
manikkaraj k6c9689d2019-05-09 12:59:52 -0400477 rpc := "PacketIn"
478 // Use a device specific topic to send the request. The adapter handling the device creates a device
479 // specific topic
480 toTopic := ap.getCoreTopic(deviceId)
481 replyToTopic := ap.getAdapterTopic()
482
483 args := make([]*kafka.KVArg, 3)
484 id := &voltha.ID{Id: deviceId}
485 args[0] = &kafka.KVArg{
486 Key: "device_id",
487 Value: id,
488 }
489 portNo := &ic.IntType{Val: int64(port)}
490 args[1] = &kafka.KVArg{
491 Key: "port",
492 Value: portNo,
493 }
494 pkt := &ic.Packet{Payload: pktPayload}
495 args[2] = &kafka.KVArg{
496 Key: "packet",
497 Value: pkt,
498 }
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000499 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800500 logger.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
manikkaraj k6c9689d2019-05-09 12:59:52 -0400501 return unPackResponse(rpc, deviceId, success, result)
502}
khenaidoob3127472019-07-24 21:04:55 -0400503
David Bainbridgebdae73c2019-10-23 17:05:41 +0000504func (ap *CoreProxy) DeviceReasonUpdate(ctx context.Context, deviceId string, deviceReason string) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800505 logger.Debugw("DeviceReasonUpdate", log.Fields{"deviceId": deviceId, "deviceReason": deviceReason})
David Bainbridgebdae73c2019-10-23 17:05:41 +0000506 rpc := "DeviceReasonUpdate"
507 // Use a device specific topic to send the request. The adapter handling the device creates a device
508 // specific topic
509 toTopic := ap.getCoreTopic(deviceId)
510 replyToTopic := ap.getAdapterTopic()
511
512 args := make([]*kafka.KVArg, 2)
513 id := &voltha.ID{Id: deviceId}
514 args[0] = &kafka.KVArg{
515 Key: "device_id",
516 Value: id,
517 }
518 reason := &ic.StrType{Val: deviceReason}
519 args[1] = &kafka.KVArg{
520 Key: "device_reason",
521 Value: reason,
522 }
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000523 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800524 logger.Debugw("DeviceReason-response", log.Fields{"pDeviceId": deviceId, "success": success})
David Bainbridgebdae73c2019-10-23 17:05:41 +0000525 return unPackResponse(rpc, deviceId, success, result)
526}
527
khenaidoob3127472019-07-24 21:04:55 -0400528func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800529 logger.Debugw("DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
khenaidoob3127472019-07-24 21:04:55 -0400530 rpc := "DevicePMConfigUpdate"
531 // Use a device specific topic to send the request. The adapter handling the device creates a device
532 // specific topic
533 toTopic := ap.getCoreTopic(pmConfigs.Id)
534 replyToTopic := ap.getAdapterTopic()
535
536 args := make([]*kafka.KVArg, 1)
537 args[0] = &kafka.KVArg{
538 Key: "device_pm_config",
539 Value: pmConfigs,
540 }
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000541 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800542 logger.Debugw("DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
khenaidoob3127472019-07-24 21:04:55 -0400543 return unPackResponse(rpc, pmConfigs.Id, success, result)
544}
khenaidooba6b6c42019-08-02 09:11:56 -0400545
546func (ap *CoreProxy) ReconcileChildDevices(ctx context.Context, parentDeviceId string) error {
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800547 logger.Debugw("ReconcileChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
khenaidooba6b6c42019-08-02 09:11:56 -0400548 rpc := "ReconcileChildDevices"
549 // Use a device specific topic to send the request. The adapter handling the device creates a device
550 // specific topic
551 toTopic := ap.getCoreTopic(parentDeviceId)
552 replyToTopic := ap.getAdapterTopic()
553
554 args := []*kafka.KVArg{
555 {Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
556 }
557
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000558 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800559 logger.Debugw("ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
khenaidooba6b6c42019-08-02 09:11:56 -0400560 return unPackResponse(rpc, parentDeviceId, success, result)
561}
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800562
563func (ap *CoreProxy) PortStateUpdate(ctx context.Context, deviceId string, pType voltha.Port_PortType, portNum uint32,
564 operStatus voltha.OperStatus_Types) error {
565 logger.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId, "portType": pType, "portNo": portNum, "operation_status": operStatus})
566 rpc := "PortStateUpdate"
567 // Use a device specific topic to send the request. The adapter handling the device creates a device
568 // specific topic
569 toTopic := ap.getCoreTopic(deviceId)
570 args := make([]*kafka.KVArg, 4)
571 deviceID := &voltha.ID{Id: deviceId}
572 portNo := &ic.IntType{Val: int64(portNum)}
573 portType := &ic.IntType{Val: int64(pType)}
574 oStatus := &ic.IntType{Val: int64(operStatus)}
575
576 args[0] = &kafka.KVArg{
577 Key: "device_id",
578 Value: deviceID,
579 }
580 args[1] = &kafka.KVArg{
581 Key: "oper_status",
582 Value: oStatus,
583 }
584 args[2] = &kafka.KVArg{
585 Key: "port_type",
586 Value: portType,
587 }
588 args[3] = &kafka.KVArg{
589 Key: "port_no",
590 Value: portNo,
591 }
592
593 // Use a device specific topic as we are the only adaptercore handling requests for this device
594 replyToTopic := ap.getAdapterTopic()
Rohan Agrawal7f72f0c2020-01-14 12:05:51 +0000595 success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800596 logger.Debugw("PortStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
597 return unPackResponse(rpc, deviceId, success, result)
598}