blob: 5bbd176775f57969158bcf339b55b25774403526 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
21 a "github.com/golang/protobuf/ptypes/any"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
24 ic "github.com/opencord/voltha-protos/go/inter_container"
25 "github.com/opencord/voltha-protos/go/voltha"
26 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
28 "sync"
29)
30
31type CoreProxy struct {
Matt Jeanneretcab955f2019-04-10 15:45:57 -040032 kafkaICProxy *kafka.InterContainerProxy
33 adapterTopic string
34 coreTopic string
35 deviceIdCoreMap map[string]string
William Kurkianea869482019-04-09 15:16:11 -040036 lockDeviceIdCoreMap sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040037}
38
39func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
40 var proxy CoreProxy
41 proxy.kafkaICProxy = kafkaProxy
42 proxy.adapterTopic = adapterTopic
43 proxy.coreTopic = coreTopic
44 proxy.deviceIdCoreMap = make(map[string]string)
45 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
46 log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
47
48 return &proxy
49}
50
51func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
52 if success {
53 return nil
54 } else {
55 unpackResult := &ic.Error{}
56 var err error
57 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
58 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
59 }
60 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
61 // TODO: Need to get the real error code
62 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
63 }
64}
65
66// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
67func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
68 ap.lockDeviceIdCoreMap.Lock()
69 defer ap.lockDeviceIdCoreMap.Unlock()
70 ap.deviceIdCoreMap[deviceId] = coreReference
71}
72
73// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
74func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
75 ap.lockDeviceIdCoreMap.Lock()
76 defer ap.lockDeviceIdCoreMap.Unlock()
77 delete(ap.deviceIdCoreMap, deviceId)
78}
79
80func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
81 ap.lockDeviceIdCoreMap.Lock()
82 defer ap.lockDeviceIdCoreMap.Unlock()
83
84 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
85 return kafka.Topic{Name: t}
86 }
87
88 return kafka.Topic{Name: ap.coreTopic}
89}
90
91func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
92 return kafka.Topic{Name: ap.adapterTopic}
93}
94
95func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
96 log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
97 rpc := "Register"
98 topic := kafka.Topic{Name: ap.coreTopic}
99 replyToTopic := ap.getAdapterTopic()
100 args := make([]*kafka.KVArg, 2)
101 args[0] = &kafka.KVArg{
102 Key: "adapter",
103 Value: adapter,
104 }
105 args[1] = &kafka.KVArg{
106 Key: "deviceTypes",
107 Value: deviceTypes,
108 }
109
110 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
111 log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
112 return unPackResponse(rpc, "", success, result)
113}
114
115func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
116 log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
117 rpc := "DeviceUpdate"
118 toTopic := ap.getCoreTopic(device.Id)
119 args := make([]*kafka.KVArg, 1)
120 args[0] = &kafka.KVArg{
121 Key: "device",
122 Value: device,
123 }
124 // Use a device specific topic as we are the only adaptercore handling requests for this device
125 replyToTopic := ap.getAdapterTopic()
126 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
127 log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
128 return unPackResponse(rpc, device.Id, success, result)
129}
130
131func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
132 log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
133 rpc := "PortCreated"
134 // Use a device specific topic to send the request. The adapter handling the device creates a device
135 // specific topic
136 toTopic := ap.getCoreTopic(deviceId)
137 args := make([]*kafka.KVArg, 2)
138 id := &voltha.ID{Id: deviceId}
139 args[0] = &kafka.KVArg{
140 Key: "device_id",
141 Value: id,
142 }
143 args[1] = &kafka.KVArg{
144 Key: "port",
145 Value: port,
146 }
147
148 // Use a device specific topic as we are the only adaptercore handling requests for this device
149 replyToTopic := ap.getAdapterTopic()
150 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
151 log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
152 return unPackResponse(rpc, deviceId, success, result)
153}
154
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400155func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_OperStatus) error {
156 log.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
157 rpc := "PortsStateUpdate"
158 // Use a device specific topic to send the request. The adapter handling the device creates a device
159 // specific topic
160 toTopic := ap.getCoreTopic(deviceId)
161 args := make([]*kafka.KVArg, 2)
162 id := &voltha.ID{Id: deviceId}
163 oStatus := &ic.IntType{Val: int64(operStatus)}
164
165 args[0] = &kafka.KVArg{
166 Key: "device_id",
167 Value: id,
168 }
169 args[1] = &kafka.KVArg{
170 Key: "oper_status",
171 Value: oStatus,
172 }
173
174 // Use a device specific topic as we are the only adaptercore handling requests for this device
175 replyToTopic := ap.getAdapterTopic()
176 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
177 log.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
178 return unPackResponse(rpc, deviceId, success, result)
179}
180
William Kurkianea869482019-04-09 15:16:11 -0400181func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
182 connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
183 log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
184 rpc := "DeviceStateUpdate"
185 // Use a device specific topic to send the request. The adapter handling the device creates a device
186 // specific topic
187 toTopic := ap.getCoreTopic(deviceId)
188 args := make([]*kafka.KVArg, 3)
189 id := &voltha.ID{Id: deviceId}
190 oStatus := &ic.IntType{Val: int64(operStatus)}
191 cStatus := &ic.IntType{Val: int64(connStatus)}
192
193 args[0] = &kafka.KVArg{
194 Key: "device_id",
195 Value: id,
196 }
197 args[1] = &kafka.KVArg{
198 Key: "oper_status",
199 Value: oStatus,
200 }
201 args[2] = &kafka.KVArg{
202 Key: "connect_status",
203 Value: cStatus,
204 }
205 // Use a device specific topic as we are the only adaptercore handling requests for this device
206 replyToTopic := ap.getAdapterTopic()
207 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
208 log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
209 return unPackResponse(rpc, deviceId, success, result)
210}
211
212func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400213 childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) error {
William Kurkianea869482019-04-09 15:16:11 -0400214 log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
215 rpc := "ChildDeviceDetected"
216 // Use a device specific topic to send the request. The adapter handling the device creates a device
217 // specific topic
218 toTopic := ap.getCoreTopic(parentDeviceId)
219 replyToTopic := ap.getAdapterTopic()
220
221 args := make([]*kafka.KVArg, 7)
222 id := &voltha.ID{Id: parentDeviceId}
223 args[0] = &kafka.KVArg{
224 Key: "parent_device_id",
225 Value: id,
226 }
227 ppn := &ic.IntType{Val: int64(parentPortNo)}
228 args[1] = &kafka.KVArg{
229 Key: "parent_port_no",
230 Value: ppn,
231 }
232 cdt := &ic.StrType{Val: childDeviceType}
233 args[2] = &kafka.KVArg{
234 Key: "child_device_type",
235 Value: cdt,
236 }
237 channel := &ic.IntType{Val: int64(channelId)}
238 args[3] = &kafka.KVArg{
239 Key: "channel_id",
240 Value: channel,
241 }
242 vId := &ic.StrType{Val: vendorId}
243 args[4] = &kafka.KVArg{
244 Key: "vendor_id",
245 Value: vId,
246 }
247 sNo := &ic.StrType{Val: serialNumber}
248 args[5] = &kafka.KVArg{
249 Key: "serial_number",
250 Value: sNo,
251 }
252 oId := &ic.IntType{Val: int64(onuId)}
253 args[6] = &kafka.KVArg{
254 Key: "onu_id",
255 Value: oId,
256 }
257
258 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
259 log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
260 return unPackResponse(rpc, parentDeviceId, success, result)
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400261
262}
263
264func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
265 log.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
266 rpc := "GetDevice"
267
268 toTopic := ap.getCoreTopic(parentDeviceId)
269 replyToTopic := ap.getAdapterTopic()
270
271 args := make([]*kafka.KVArg, 1)
272 id := &voltha.ID{Id: deviceId}
273 args[0] = &kafka.KVArg{
274 Key: "device_id",
275 Value: id,
276 }
277
278 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
279 log.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
280
281 if success {
282 volthaDevice := &voltha.Device{}
283 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
284 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
285 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
286 }
287 return volthaDevice, nil
288 } else {
289 unpackResult := &ic.Error{}
290 var err error
291 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
292 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
293 }
294 log.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
295 // TODO: Need to get the real error code
296 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
297 }
298}
299
300func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
301 log.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
302 rpc := "GetChildDevice"
303
304 toTopic := ap.getCoreTopic(parentDeviceId)
305 replyToTopic := ap.getAdapterTopic()
306
307 args := make([]*kafka.KVArg, 4)
308 id := &voltha.ID{Id: parentDeviceId}
309 args[0] = &kafka.KVArg{
310 Key: "device_id",
311 Value: id,
312 }
313
314 var cnt uint8 = 0
315 for k, v := range kwargs {
316 cnt += 1
317 if k == "serial_number" {
318 val := &ic.StrType{Val: v.(string)}
319 args[cnt] = &kafka.KVArg{
320 Key: k,
321 Value: val,
322 }
323 } else if k == "onu_id" {
324 val := &ic.IntType{Val: int64(v.(uint32))}
325 args[cnt] = &kafka.KVArg{
326 Key: k,
327 Value: val,
328 }
329 } else if k == "parent_port_no" {
330 val := &ic.IntType{Val: int64(v.(uint32))}
331 args[cnt] = &kafka.KVArg{
332 Key: k,
333 Value: val,
334 }
335 }
336 }
337
338 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
339 log.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
340
341 if success {
342 volthaDevice := &voltha.Device{}
343 if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
344 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
345 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
346 }
347 return volthaDevice, nil
348 } else {
349 unpackResult := &ic.Error{}
350 var err error
351 if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
352 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
353 }
354 log.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
355 // TODO: Need to get the real error code
356 return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
357 }
William Kurkianea869482019-04-09 15:16:11 -0400358}