blob: 738a77ad7737259d7a33d2e94946b46b7835a2de [file] [log] [blame]
khenaidood2b6df92018-12-13 16:37:20 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
21 a "github.com/golang/protobuf/ptypes/any"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
William Kurkiandaa6bb22019-03-07 12:26:28 -050024 ic "github.com/opencord/voltha-protos/go/inter_container"
25 "github.com/opencord/voltha-protos/go/voltha"
khenaidood2b6df92018-12-13 16:37:20 -050026 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
khenaidoo54e0ddf2019-02-27 16:21:33 -050028 "sync"
khenaidood2b6df92018-12-13 16:37:20 -050029)
30
31type CoreProxy struct {
cuilin20186b6a9952019-04-03 22:37:11 -070032 kafkaICProxy *kafka.InterContainerProxy
33 adapterTopic string
34 coreTopic string
35 deviceIdCoreMap map[string]string
khenaidoo54e0ddf2019-02-27 16:21:33 -050036 lockDeviceIdCoreMap sync.RWMutex
37
khenaidood2b6df92018-12-13 16:37:20 -050038}
39
40func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
41 var proxy CoreProxy
42 proxy.kafkaICProxy = kafkaProxy
43 proxy.adapterTopic = adapterTopic
44 proxy.coreTopic = coreTopic
khenaidoo54e0ddf2019-02-27 16:21:33 -050045 proxy.deviceIdCoreMap = make(map[string]string)
46 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -050047 log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
48
49 return &proxy
50}
51
52func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
53 if success {
54 return nil
55 } else {
56 unpackResult := &ic.Error{}
57 var err error
58 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
59 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
60 }
61 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
62 // TODO: Need to get the real error code
63 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
64 }
65}
66
khenaidoo54e0ddf2019-02-27 16:21:33 -050067// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
68func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
69 ap.lockDeviceIdCoreMap.Lock()
70 defer ap.lockDeviceIdCoreMap.Unlock()
71 ap.deviceIdCoreMap[deviceId] = coreReference
72}
73
74// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
75func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
76 ap.lockDeviceIdCoreMap.Lock()
77 defer ap.lockDeviceIdCoreMap.Unlock()
78 delete(ap.deviceIdCoreMap, deviceId)
79}
80
81func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
82 ap.lockDeviceIdCoreMap.Lock()
83 defer ap.lockDeviceIdCoreMap.Unlock()
84
85 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
86 return kafka.Topic{Name: t}
87 }
88
89 return kafka.Topic{Name: ap.coreTopic}
90}
91
92func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
93 return kafka.Topic{Name: ap.adapterTopic}
94}
95
khenaidood2b6df92018-12-13 16:37:20 -050096func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
97 log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
98 rpc := "Register"
99 topic := kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -0500100 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500101 args := make([]*kafka.KVArg, 2)
102 args[0] = &kafka.KVArg{
103 Key: "adapter",
104 Value: adapter,
105 }
106 args[1] = &kafka.KVArg{
107 Key: "deviceTypes",
108 Value: deviceTypes,
109 }
110
khenaidoobdcb8e02019-03-06 16:28:56 -0500111 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
khenaidood2b6df92018-12-13 16:37:20 -0500112 log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
113 return unPackResponse(rpc, "", success, result)
114}
115
116func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
117 log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
118 rpc := "DeviceUpdate"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500119 toTopic := ap.getCoreTopic(device.Id)
khenaidood2b6df92018-12-13 16:37:20 -0500120 args := make([]*kafka.KVArg, 1)
121 args[0] = &kafka.KVArg{
122 Key: "device",
123 Value: device,
124 }
125 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500126 replyToTopic := ap.getAdapterTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500127 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500128 log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
129 return unPackResponse(rpc, device.Id, success, result)
130}
131
132func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
133 log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
134 rpc := "PortCreated"
135 // Use a device specific topic to send the request. The adapter handling the device creates a device
136 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500137 toTopic := ap.getCoreTopic(deviceId)
khenaidood2b6df92018-12-13 16:37:20 -0500138 args := make([]*kafka.KVArg, 2)
139 id := &voltha.ID{Id: deviceId}
140 args[0] = &kafka.KVArg{
141 Key: "device_id",
142 Value: id,
143 }
144 args[1] = &kafka.KVArg{
145 Key: "port",
146 Value: port,
147 }
148
149 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500150 replyToTopic := ap.getAdapterTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500151 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500152 log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
153 return unPackResponse(rpc, deviceId, success, result)
154}
155
156func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
157 connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
158 log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
159 rpc := "DeviceStateUpdate"
160 // Use a device specific topic to send the request. The adapter handling the device creates a device
161 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500162 toTopic := ap.getCoreTopic(deviceId)
khenaidood2b6df92018-12-13 16:37:20 -0500163 args := make([]*kafka.KVArg, 3)
164 id := &voltha.ID{Id: deviceId}
165 oStatus := &ic.IntType{Val: int64(operStatus)}
166 cStatus := &ic.IntType{Val: int64(connStatus)}
167
168 args[0] = &kafka.KVArg{
169 Key: "device_id",
170 Value: id,
171 }
172 args[1] = &kafka.KVArg{
173 Key: "oper_status",
174 Value: oStatus,
175 }
176 args[2] = &kafka.KVArg{
177 Key: "connect_status",
178 Value: cStatus,
179 }
180 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500181 replyToTopic := ap.getAdapterTopic()
khenaidoobdcb8e02019-03-06 16:28:56 -0500182 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500183 log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
184 return unPackResponse(rpc, deviceId, success, result)
185}
186
187func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
khenaidoobdcb8e02019-03-06 16:28:56 -0500188 childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64 ) error {
khenaidood2b6df92018-12-13 16:37:20 -0500189 log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
190 rpc := "ChildDeviceDetected"
191 // Use a device specific topic to send the request. The adapter handling the device creates a device
192 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500193 toTopic := ap.getCoreTopic(parentDeviceId)
194 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500195
khenaidoobdcb8e02019-03-06 16:28:56 -0500196 args := make([]*kafka.KVArg, 7)
khenaidood2b6df92018-12-13 16:37:20 -0500197 id := &voltha.ID{Id: parentDeviceId}
198 args[0] = &kafka.KVArg{
199 Key: "parent_device_id",
200 Value: id,
201 }
202 ppn := &ic.IntType{Val: int64(parentPortNo)}
203 args[1] = &kafka.KVArg{
204 Key: "parent_port_no",
205 Value: ppn,
206 }
207 cdt := &ic.StrType{Val: childDeviceType}
208 args[2] = &kafka.KVArg{
209 Key: "child_device_type",
210 Value: cdt,
211 }
212 channel := &ic.IntType{Val: int64(channelId)}
213 args[3] = &kafka.KVArg{
214 Key: "channel_id",
215 Value: channel,
216 }
khenaidoobdcb8e02019-03-06 16:28:56 -0500217 vId := &ic.StrType{Val: vendorId}
218 args[4] = &kafka.KVArg{
219 Key: "vendor_id",
220 Value: vId,
221 }
222 sNo := &ic.StrType{Val: serialNumber}
223 args[5] = &kafka.KVArg{
224 Key: "serial_number",
225 Value: sNo,
226 }
227 oId := &ic.IntType{Val: int64(onuId)}
228 args[6] = &kafka.KVArg{
229 Key: "onu_id",
230 Value: oId,
231 }
khenaidood2b6df92018-12-13 16:37:20 -0500232
khenaidoobdcb8e02019-03-06 16:28:56 -0500233 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
khenaidood2b6df92018-12-13 16:37:20 -0500234 log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
235 return unPackResponse(rpc, parentDeviceId, success, result)
cuilin20186b6a9952019-04-03 22:37:11 -0700236
237}
238
239func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
240 log.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
241 rpc := "GetDevice"
242
243 toTopic := ap.getCoreTopic(parentDeviceId)
244 replyToTopic := ap.getAdapterTopic()
245
246 args := make([]*kafka.KVArg, 1)
247 id := &voltha.ID{Id: deviceId}
248 args[0] = &kafka.KVArg{
249 Key: "device_id",
250 Value: id,
251 }
252
253 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
254 log.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
255
256 if success {
257 volthaDevice := &voltha.Device{}
258 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
259 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
260 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
261 }
262 return volthaDevice, nil
263 } else {
264 unpackResult := &ic.Error{}
265 var err error
266 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
267 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
268 }
269 log.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
270 // TODO: Need to get the real error code
271 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
272 }
273}
274
275func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
276 log.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
277 rpc := "GetChildDevice"
278
279 toTopic := ap.getCoreTopic(parentDeviceId)
280 replyToTopic := ap.getAdapterTopic()
281
282 args := make([]*kafka.KVArg, 4)
283 id := &voltha.ID{Id: parentDeviceId}
284 args[0] = &kafka.KVArg{
285 Key: "device_id",
286 Value: id,
287 }
288
289 var cnt uint8 = 0
290 for k, v := range kwargs {
291 cnt += 1
292 if k == "serial_number" {
293 val := &ic.StrType{Val: v.(string)}
294 args[cnt] = &kafka.KVArg{
295 Key: k,
296 Value: val,
297 }
298 } else if k == "onu_id" {
299 val := &ic.IntType{Val: int64(v.(uint32))}
300 args[cnt] = &kafka.KVArg{
301 Key: k,
302 Value: val,
303 }
304 } else if k == "parent_port_no" {
305 val := &ic.IntType{Val: int64(v.(uint32))}
306 args[cnt] = &kafka.KVArg{
307 Key: k,
308 Value: val,
309 }
310 }
311 }
312
313 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
314 log.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
315
316 if success {
317 volthaDevice := &voltha.Device{}
318 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
319 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
320 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
321 }
322 return volthaDevice, nil
323 } else {
324 unpackResult := &ic.Error{}
325 var err error
326 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
327 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
328 }
329 log.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
330 // TODO: Need to get the real error code
331 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
332 }
khenaidood2b6df92018-12-13 16:37:20 -0500333}