blob: 11a22abb807f55737ca28ede503a7889a7a14664 [file] [log] [blame]
khenaidood2b6df92018-12-13 16:37:20 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
21 a "github.com/golang/protobuf/ptypes/any"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050024 ic "github.com/opencord/voltha-protos/go/inter_container"
25 "github.com/opencord/voltha-protos/go/voltha"
khenaidood2b6df92018-12-13 16:37:20 -050026 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
khenaidoo54e0ddf2019-02-27 16:21:33 -050028 "sync"
khenaidood2b6df92018-12-13 16:37:20 -050029)
30
31type CoreProxy struct {
cuilin20186b6a9952019-04-03 22:37:11 -070032 kafkaICProxy *kafka.InterContainerProxy
33 adapterTopic string
34 coreTopic string
35 deviceIdCoreMap map[string]string
khenaidoo54e0ddf2019-02-27 16:21:33 -050036 lockDeviceIdCoreMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050037}
38
39func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
40 var proxy CoreProxy
41 proxy.kafkaICProxy = kafkaProxy
42 proxy.adapterTopic = adapterTopic
43 proxy.coreTopic = coreTopic
khenaidoo54e0ddf2019-02-27 16:21:33 -050044 proxy.deviceIdCoreMap = make(map[string]string)
45 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -050046 log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
47
48 return &proxy
49}
50
51func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
52 if success {
53 return nil
54 } else {
55 unpackResult := &ic.Error{}
56 var err error
57 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
58 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
59 }
60 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
61 // TODO: Need to get the real error code
62 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
63 }
64}
65
khenaidoo54e0ddf2019-02-27 16:21:33 -050066// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
67func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
68 ap.lockDeviceIdCoreMap.Lock()
69 defer ap.lockDeviceIdCoreMap.Unlock()
70 ap.deviceIdCoreMap[deviceId] = coreReference
71}
72
73// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
74func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
75 ap.lockDeviceIdCoreMap.Lock()
76 defer ap.lockDeviceIdCoreMap.Unlock()
77 delete(ap.deviceIdCoreMap, deviceId)
78}
79
80func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
81 ap.lockDeviceIdCoreMap.Lock()
82 defer ap.lockDeviceIdCoreMap.Unlock()
83
84 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
85 return kafka.Topic{Name: t}
86 }
87
88 return kafka.Topic{Name: ap.coreTopic}
89}
90
91func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
92 return kafka.Topic{Name: ap.adapterTopic}
93}
94
khenaidood2b6df92018-12-13 16:37:20 -050095func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
96 log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
97 rpc := "Register"
98 topic := kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -050099 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500100 args := make([]*kafka.KVArg, 2)
101 args[0] = &kafka.KVArg{
102 Key: "adapter",
103 Value: adapter,
104 }
105 args[1] = &kafka.KVArg{
106 Key: "deviceTypes",
107 Value: deviceTypes,
108 }
109
khenaidoobdcb8e02019-03-06 16:28:56 -0500110 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
khenaidood2b6df92018-12-13 16:37:20 -0500111 log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
112 return unPackResponse(rpc, "", success, result)
113}
114
115func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
116 log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
117 rpc := "DeviceUpdate"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500118 toTopic := ap.getCoreTopic(device.Id)
khenaidood2b6df92018-12-13 16:37:20 -0500119 args := make([]*kafka.KVArg, 1)
120 args[0] = &kafka.KVArg{
121 Key: "device",
122 Value: device,
123 }
124 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500125 replyToTopic := ap.getAdapterTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500126 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500127 log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
128 return unPackResponse(rpc, device.Id, success, result)
129}
130
131func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
132 log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
133 rpc := "PortCreated"
134 // Use a device specific topic to send the request. The adapter handling the device creates a device
135 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500136 toTopic := ap.getCoreTopic(deviceId)
khenaidood2b6df92018-12-13 16:37:20 -0500137 args := make([]*kafka.KVArg, 2)
138 id := &voltha.ID{Id: deviceId}
139 args[0] = &kafka.KVArg{
140 Key: "device_id",
141 Value: id,
142 }
143 args[1] = &kafka.KVArg{
144 Key: "port",
145 Value: port,
146 }
147
148 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500149 replyToTopic := ap.getAdapterTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500150 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500151 log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
152 return unPackResponse(rpc, deviceId, success, result)
153}
154
155func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
156 connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
157 log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
158 rpc := "DeviceStateUpdate"
159 // Use a device specific topic to send the request. The adapter handling the device creates a device
160 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500161 toTopic := ap.getCoreTopic(deviceId)
khenaidood2b6df92018-12-13 16:37:20 -0500162 args := make([]*kafka.KVArg, 3)
163 id := &voltha.ID{Id: deviceId}
164 oStatus := &ic.IntType{Val: int64(operStatus)}
165 cStatus := &ic.IntType{Val: int64(connStatus)}
166
167 args[0] = &kafka.KVArg{
168 Key: "device_id",
169 Value: id,
170 }
171 args[1] = &kafka.KVArg{
172 Key: "oper_status",
173 Value: oStatus,
174 }
175 args[2] = &kafka.KVArg{
176 Key: "connect_status",
177 Value: cStatus,
178 }
179 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500180 replyToTopic := ap.getAdapterTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500181 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500182 log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
183 return unPackResponse(rpc, deviceId, success, result)
184}
185
186func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
khenaidoo2c6a0992019-04-29 13:46:56 -0400187 childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) error {
khenaidood2b6df92018-12-13 16:37:20 -0500188 log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
189 rpc := "ChildDeviceDetected"
190 // Use a device specific topic to send the request. The adapter handling the device creates a device
191 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500192 toTopic := ap.getCoreTopic(parentDeviceId)
193 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500194
khenaidoobdcb8e02019-03-06 16:28:56 -0500195 args := make([]*kafka.KVArg, 7)
khenaidood2b6df92018-12-13 16:37:20 -0500196 id := &voltha.ID{Id: parentDeviceId}
197 args[0] = &kafka.KVArg{
198 Key: "parent_device_id",
199 Value: id,
200 }
201 ppn := &ic.IntType{Val: int64(parentPortNo)}
202 args[1] = &kafka.KVArg{
203 Key: "parent_port_no",
204 Value: ppn,
205 }
206 cdt := &ic.StrType{Val: childDeviceType}
207 args[2] = &kafka.KVArg{
208 Key: "child_device_type",
209 Value: cdt,
210 }
211 channel := &ic.IntType{Val: int64(channelId)}
212 args[3] = &kafka.KVArg{
213 Key: "channel_id",
214 Value: channel,
215 }
khenaidoobdcb8e02019-03-06 16:28:56 -0500216 vId := &ic.StrType{Val: vendorId}
217 args[4] = &kafka.KVArg{
218 Key: "vendor_id",
219 Value: vId,
220 }
221 sNo := &ic.StrType{Val: serialNumber}
222 args[5] = &kafka.KVArg{
223 Key: "serial_number",
224 Value: sNo,
225 }
226 oId := &ic.IntType{Val: int64(onuId)}
227 args[6] = &kafka.KVArg{
228 Key: "onu_id",
229 Value: oId,
230 }
khenaidood2b6df92018-12-13 16:37:20 -0500231
khenaidoobdcb8e02019-03-06 16:28:56 -0500232 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500233 log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
234 return unPackResponse(rpc, parentDeviceId, success, result)
cuilin20186b6a9952019-04-03 22:37:11 -0700235
236}
237
238func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
239 log.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
240 rpc := "GetDevice"
241
242 toTopic := ap.getCoreTopic(parentDeviceId)
243 replyToTopic := ap.getAdapterTopic()
244
245 args := make([]*kafka.KVArg, 1)
246 id := &voltha.ID{Id: deviceId}
247 args[0] = &kafka.KVArg{
248 Key: "device_id",
249 Value: id,
250 }
251
252 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
253 log.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
254
255 if success {
256 volthaDevice := &voltha.Device{}
257 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
258 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
259 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
260 }
261 return volthaDevice, nil
262 } else {
263 unpackResult := &ic.Error{}
264 var err error
265 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
266 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
267 }
268 log.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
269 // TODO: Need to get the real error code
270 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
271 }
272}
273
274func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
275 log.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
276 rpc := "GetChildDevice"
277
278 toTopic := ap.getCoreTopic(parentDeviceId)
279 replyToTopic := ap.getAdapterTopic()
280
281 args := make([]*kafka.KVArg, 4)
282 id := &voltha.ID{Id: parentDeviceId}
283 args[0] = &kafka.KVArg{
284 Key: "device_id",
285 Value: id,
286 }
287
288 var cnt uint8 = 0
289 for k, v := range kwargs {
290 cnt += 1
291 if k == "serial_number" {
292 val := &ic.StrType{Val: v.(string)}
293 args[cnt] = &kafka.KVArg{
294 Key: k,
295 Value: val,
296 }
297 } else if k == "onu_id" {
298 val := &ic.IntType{Val: int64(v.(uint32))}
299 args[cnt] = &kafka.KVArg{
300 Key: k,
301 Value: val,
302 }
303 } else if k == "parent_port_no" {
304 val := &ic.IntType{Val: int64(v.(uint32))}
305 args[cnt] = &kafka.KVArg{
306 Key: k,
307 Value: val,
308 }
309 }
310 }
311
312 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
313 log.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
314
315 if success {
316 volthaDevice := &voltha.Device{}
317 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
318 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
319 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
320 }
321 return volthaDevice, nil
322 } else {
323 unpackResult := &ic.Error{}
324 var err error
325 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
326 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
327 }
328 log.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
329 // TODO: Need to get the real error code
330 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
331 }
khenaidood2b6df92018-12-13 16:37:20 -0500332}