blob: 5bbd176775f57969158bcf339b55b25774403526 [file] [log] [blame]
khenaidood2b6df92018-12-13 16:37:20 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
21 a "github.com/golang/protobuf/ptypes/any"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050024 ic "github.com/opencord/voltha-protos/go/inter_container"
25 "github.com/opencord/voltha-protos/go/voltha"
khenaidood2b6df92018-12-13 16:37:20 -050026 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
khenaidoo54e0ddf2019-02-27 16:21:33 -050028 "sync"
khenaidood2b6df92018-12-13 16:37:20 -050029)
30
31type CoreProxy struct {
cuilin20186b6a9952019-04-03 22:37:11 -070032 kafkaICProxy *kafka.InterContainerProxy
33 adapterTopic string
34 coreTopic string
35 deviceIdCoreMap map[string]string
khenaidoo54e0ddf2019-02-27 16:21:33 -050036 lockDeviceIdCoreMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050037}
38
39func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
40 var proxy CoreProxy
41 proxy.kafkaICProxy = kafkaProxy
42 proxy.adapterTopic = adapterTopic
43 proxy.coreTopic = coreTopic
khenaidoo54e0ddf2019-02-27 16:21:33 -050044 proxy.deviceIdCoreMap = make(map[string]string)
45 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -050046 log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
47
48 return &proxy
49}
50
51func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
52 if success {
53 return nil
54 } else {
55 unpackResult := &ic.Error{}
56 var err error
57 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
58 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
59 }
60 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
61 // TODO: Need to get the real error code
62 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
63 }
64}
65
khenaidoo54e0ddf2019-02-27 16:21:33 -050066// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
67func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
68 ap.lockDeviceIdCoreMap.Lock()
69 defer ap.lockDeviceIdCoreMap.Unlock()
70 ap.deviceIdCoreMap[deviceId] = coreReference
71}
72
73// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
74func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
75 ap.lockDeviceIdCoreMap.Lock()
76 defer ap.lockDeviceIdCoreMap.Unlock()
77 delete(ap.deviceIdCoreMap, deviceId)
78}
79
80func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
81 ap.lockDeviceIdCoreMap.Lock()
82 defer ap.lockDeviceIdCoreMap.Unlock()
83
84 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
85 return kafka.Topic{Name: t}
86 }
87
88 return kafka.Topic{Name: ap.coreTopic}
89}
90
91func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
92 return kafka.Topic{Name: ap.adapterTopic}
93}
94
khenaidood2b6df92018-12-13 16:37:20 -050095func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
96 log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
97 rpc := "Register"
98 topic := kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050099 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500100 args := make([]*kafka.KVArg, 2)
101 args[0] = &kafka.KVArg{
102 Key: "adapter",
103 Value: adapter,
104 }
105 args[1] = &kafka.KVArg{
106 Key: "deviceTypes",
107 Value: deviceTypes,
108 }
109
khenaidoobdcb8e02019-03-06 16:28:56 -0500110 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
khenaidood2b6df92018-12-13 16:37:20 -0500111 log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
112 return unPackResponse(rpc, "", success, result)
113}
114
115func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
116 log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
117 rpc := "DeviceUpdate"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500118 toTopic := ap.getCoreTopic(device.Id)
khenaidood2b6df92018-12-13 16:37:20 -0500119 args := make([]*kafka.KVArg, 1)
120 args[0] = &kafka.KVArg{
121 Key: "device",
122 Value: device,
123 }
124 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500125 replyToTopic := ap.getAdapterTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500126 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500127 log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
128 return unPackResponse(rpc, device.Id, success, result)
129}
130
131func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
132 log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
133 rpc := "PortCreated"
134 // Use a device specific topic to send the request. The adapter handling the device creates a device
135 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500136 toTopic := ap.getCoreTopic(deviceId)
khenaidood2b6df92018-12-13 16:37:20 -0500137 args := make([]*kafka.KVArg, 2)
138 id := &voltha.ID{Id: deviceId}
139 args[0] = &kafka.KVArg{
140 Key: "device_id",
141 Value: id,
142 }
143 args[1] = &kafka.KVArg{
144 Key: "port",
145 Value: port,
146 }
147
148 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500149 replyToTopic := ap.getAdapterTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500150 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500151 log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
152 return unPackResponse(rpc, deviceId, success, result)
153}
154
khenaidoo3ab34882019-05-02 21:33:30 -0400155func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_OperStatus) error {
156 log.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
157 rpc := "PortsStateUpdate"
158 // Use a device specific topic to send the request. The adapter handling the device creates a device
159 // specific topic
160 toTopic := ap.getCoreTopic(deviceId)
161 args := make([]*kafka.KVArg, 2)
162 id := &voltha.ID{Id: deviceId}
163 oStatus := &ic.IntType{Val: int64(operStatus)}
164
165 args[0] = &kafka.KVArg{
166 Key: "device_id",
167 Value: id,
168 }
169 args[1] = &kafka.KVArg{
170 Key: "oper_status",
171 Value: oStatus,
172 }
173
174 // Use a device specific topic as we are the only adaptercore handling requests for this device
175 replyToTopic := ap.getAdapterTopic()
176 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
177 log.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
178 return unPackResponse(rpc, deviceId, success, result)
179}
180
khenaidood2b6df92018-12-13 16:37:20 -0500181func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
182 connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
183 log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
184 rpc := "DeviceStateUpdate"
185 // Use a device specific topic to send the request. The adapter handling the device creates a device
186 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500187 toTopic := ap.getCoreTopic(deviceId)
khenaidood2b6df92018-12-13 16:37:20 -0500188 args := make([]*kafka.KVArg, 3)
189 id := &voltha.ID{Id: deviceId}
190 oStatus := &ic.IntType{Val: int64(operStatus)}
191 cStatus := &ic.IntType{Val: int64(connStatus)}
192
193 args[0] = &kafka.KVArg{
194 Key: "device_id",
195 Value: id,
196 }
197 args[1] = &kafka.KVArg{
198 Key: "oper_status",
199 Value: oStatus,
200 }
201 args[2] = &kafka.KVArg{
202 Key: "connect_status",
203 Value: cStatus,
204 }
205 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500206 replyToTopic := ap.getAdapterTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500207 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500208 log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
209 return unPackResponse(rpc, deviceId, success, result)
210}
211
212func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
khenaidoo2c6a0992019-04-29 13:46:56 -0400213 childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) error {
khenaidood2b6df92018-12-13 16:37:20 -0500214 log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
215 rpc := "ChildDeviceDetected"
216 // Use a device specific topic to send the request. The adapter handling the device creates a device
217 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500218 toTopic := ap.getCoreTopic(parentDeviceId)
219 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500220
khenaidoobdcb8e02019-03-06 16:28:56 -0500221 args := make([]*kafka.KVArg, 7)
khenaidood2b6df92018-12-13 16:37:20 -0500222 id := &voltha.ID{Id: parentDeviceId}
223 args[0] = &kafka.KVArg{
224 Key: "parent_device_id",
225 Value: id,
226 }
227 ppn := &ic.IntType{Val: int64(parentPortNo)}
228 args[1] = &kafka.KVArg{
229 Key: "parent_port_no",
230 Value: ppn,
231 }
232 cdt := &ic.StrType{Val: childDeviceType}
233 args[2] = &kafka.KVArg{
234 Key: "child_device_type",
235 Value: cdt,
236 }
237 channel := &ic.IntType{Val: int64(channelId)}
238 args[3] = &kafka.KVArg{
239 Key: "channel_id",
240 Value: channel,
241 }
khenaidoobdcb8e02019-03-06 16:28:56 -0500242 vId := &ic.StrType{Val: vendorId}
243 args[4] = &kafka.KVArg{
244 Key: "vendor_id",
245 Value: vId,
246 }
247 sNo := &ic.StrType{Val: serialNumber}
248 args[5] = &kafka.KVArg{
249 Key: "serial_number",
250 Value: sNo,
251 }
252 oId := &ic.IntType{Val: int64(onuId)}
253 args[6] = &kafka.KVArg{
254 Key: "onu_id",
255 Value: oId,
256 }
khenaidood2b6df92018-12-13 16:37:20 -0500257
khenaidoobdcb8e02019-03-06 16:28:56 -0500258 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500259 log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
260 return unPackResponse(rpc, parentDeviceId, success, result)
cuilin20186b6a9952019-04-03 22:37:11 -0700261
262}
263
264func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
265 log.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
266 rpc := "GetDevice"
267
268 toTopic := ap.getCoreTopic(parentDeviceId)
269 replyToTopic := ap.getAdapterTopic()
270
271 args := make([]*kafka.KVArg, 1)
272 id := &voltha.ID{Id: deviceId}
273 args[0] = &kafka.KVArg{
274 Key: "device_id",
275 Value: id,
276 }
277
278 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
279 log.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
280
281 if success {
282 volthaDevice := &voltha.Device{}
283 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
284 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
285 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
286 }
287 return volthaDevice, nil
288 } else {
289 unpackResult := &ic.Error{}
290 var err error
291 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
292 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
293 }
294 log.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
295 // TODO: Need to get the real error code
296 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
297 }
298}
299
300func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
301 log.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
302 rpc := "GetChildDevice"
303
304 toTopic := ap.getCoreTopic(parentDeviceId)
305 replyToTopic := ap.getAdapterTopic()
306
307 args := make([]*kafka.KVArg, 4)
308 id := &voltha.ID{Id: parentDeviceId}
309 args[0] = &kafka.KVArg{
310 Key: "device_id",
311 Value: id,
312 }
313
314 var cnt uint8 = 0
315 for k, v := range kwargs {
316 cnt += 1
317 if k == "serial_number" {
318 val := &ic.StrType{Val: v.(string)}
319 args[cnt] = &kafka.KVArg{
320 Key: k,
321 Value: val,
322 }
323 } else if k == "onu_id" {
324 val := &ic.IntType{Val: int64(v.(uint32))}
325 args[cnt] = &kafka.KVArg{
326 Key: k,
327 Value: val,
328 }
329 } else if k == "parent_port_no" {
330 val := &ic.IntType{Val: int64(v.(uint32))}
331 args[cnt] = &kafka.KVArg{
332 Key: k,
333 Value: val,
334 }
335 }
336 }
337
338 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
339 log.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
340
341 if success {
342 volthaDevice := &voltha.Device{}
343 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
344 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
345 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
346 }
347 return volthaDevice, nil
348 } else {
349 unpackResult := &ic.Error{}
350 var err error
351 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
352 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
353 }
354 log.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
355 // TODO: Need to get the real error code
356 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
357 }
khenaidood2b6df92018-12-13 16:37:20 -0500358}