blob: 137877fff5928b7b80024364fdb9c1223221b08f [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
21 a "github.com/golang/protobuf/ptypes/any"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
24 ic "github.com/opencord/voltha-protos/go/inter_container"
25 "github.com/opencord/voltha-protos/go/voltha"
26 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
28 "sync"
29)
30
31type CoreProxy struct {
32 kafkaICProxy *kafka.InterContainerProxy
33 adapterTopic string
34 coreTopic string
35 deviceIdCoreMap map[string]string
36 lockDeviceIdCoreMap sync.RWMutex
37
38}
39
40func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
41 var proxy CoreProxy
42 proxy.kafkaICProxy = kafkaProxy
43 proxy.adapterTopic = adapterTopic
44 proxy.coreTopic = coreTopic
45 proxy.deviceIdCoreMap = make(map[string]string)
46 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
47 log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
48
49 return &proxy
50}
51
52func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
53 if success {
54 return nil
55 } else {
56 unpackResult := &ic.Error{}
57 var err error
58 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
59 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
60 }
61 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
62 // TODO: Need to get the real error code
63 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
64 }
65}
66
67// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
68func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
69 ap.lockDeviceIdCoreMap.Lock()
70 defer ap.lockDeviceIdCoreMap.Unlock()
71 ap.deviceIdCoreMap[deviceId] = coreReference
72}
73
74// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
75func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
76 ap.lockDeviceIdCoreMap.Lock()
77 defer ap.lockDeviceIdCoreMap.Unlock()
78 delete(ap.deviceIdCoreMap, deviceId)
79}
80
81func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
82 ap.lockDeviceIdCoreMap.Lock()
83 defer ap.lockDeviceIdCoreMap.Unlock()
84
85 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
86 return kafka.Topic{Name: t}
87 }
88
89 return kafka.Topic{Name: ap.coreTopic}
90}
91
92func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
93 return kafka.Topic{Name: ap.adapterTopic}
94}
95
96func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
97 log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
98 rpc := "Register"
99 topic := kafka.Topic{Name: ap.coreTopic}
100 replyToTopic := ap.getAdapterTopic()
101 args := make([]*kafka.KVArg, 2)
102 args[0] = &kafka.KVArg{
103 Key: "adapter",
104 Value: adapter,
105 }
106 args[1] = &kafka.KVArg{
107 Key: "deviceTypes",
108 Value: deviceTypes,
109 }
110
111 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
112 log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
113 return unPackResponse(rpc, "", success, result)
114}
115
116func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
117 log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
118 rpc := "DeviceUpdate"
119 toTopic := ap.getCoreTopic(device.Id)
120 args := make([]*kafka.KVArg, 1)
121 args[0] = &kafka.KVArg{
122 Key: "device",
123 Value: device,
124 }
125 // Use a device specific topic as we are the only adaptercore handling requests for this device
126 replyToTopic := ap.getAdapterTopic()
127 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
128 log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
129 return unPackResponse(rpc, device.Id, success, result)
130}
131
132func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
133 log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
134 rpc := "PortCreated"
135 // Use a device specific topic to send the request. The adapter handling the device creates a device
136 // specific topic
137 toTopic := ap.getCoreTopic(deviceId)
138 args := make([]*kafka.KVArg, 2)
139 id := &voltha.ID{Id: deviceId}
140 args[0] = &kafka.KVArg{
141 Key: "device_id",
142 Value: id,
143 }
144 args[1] = &kafka.KVArg{
145 Key: "port",
146 Value: port,
147 }
148
149 // Use a device specific topic as we are the only adaptercore handling requests for this device
150 replyToTopic := ap.getAdapterTopic()
151 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
152 log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
153 return unPackResponse(rpc, deviceId, success, result)
154}
155
156func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
157 connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
158 log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
159 rpc := "DeviceStateUpdate"
160 // Use a device specific topic to send the request. The adapter handling the device creates a device
161 // specific topic
162 toTopic := ap.getCoreTopic(deviceId)
163 args := make([]*kafka.KVArg, 3)
164 id := &voltha.ID{Id: deviceId}
165 oStatus := &ic.IntType{Val: int64(operStatus)}
166 cStatus := &ic.IntType{Val: int64(connStatus)}
167
168 args[0] = &kafka.KVArg{
169 Key: "device_id",
170 Value: id,
171 }
172 args[1] = &kafka.KVArg{
173 Key: "oper_status",
174 Value: oStatus,
175 }
176 args[2] = &kafka.KVArg{
177 Key: "connect_status",
178 Value: cStatus,
179 }
180 // Use a device specific topic as we are the only adaptercore handling requests for this device
181 replyToTopic := ap.getAdapterTopic()
182 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
183 log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
184 return unPackResponse(rpc, deviceId, success, result)
185}
186
187func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
188 childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64 ) error {
189 log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
190 rpc := "ChildDeviceDetected"
191 // Use a device specific topic to send the request. The adapter handling the device creates a device
192 // specific topic
193 toTopic := ap.getCoreTopic(parentDeviceId)
194 replyToTopic := ap.getAdapterTopic()
195
196 args := make([]*kafka.KVArg, 7)
197 id := &voltha.ID{Id: parentDeviceId}
198 args[0] = &kafka.KVArg{
199 Key: "parent_device_id",
200 Value: id,
201 }
202 ppn := &ic.IntType{Val: int64(parentPortNo)}
203 args[1] = &kafka.KVArg{
204 Key: "parent_port_no",
205 Value: ppn,
206 }
207 cdt := &ic.StrType{Val: childDeviceType}
208 args[2] = &kafka.KVArg{
209 Key: "child_device_type",
210 Value: cdt,
211 }
212 channel := &ic.IntType{Val: int64(channelId)}
213 args[3] = &kafka.KVArg{
214 Key: "channel_id",
215 Value: channel,
216 }
217 vId := &ic.StrType{Val: vendorId}
218 args[4] = &kafka.KVArg{
219 Key: "vendor_id",
220 Value: vId,
221 }
222 sNo := &ic.StrType{Val: serialNumber}
223 args[5] = &kafka.KVArg{
224 Key: "serial_number",
225 Value: sNo,
226 }
227 oId := &ic.IntType{Val: int64(onuId)}
228 args[6] = &kafka.KVArg{
229 Key: "onu_id",
230 Value: oId,
231 }
232
233 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
234 log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
235 return unPackResponse(rpc, parentDeviceId, success, result)
236}