blob: a503c97e6f41ff23d068e4a0fb60b532d445f01a [file] [log] [blame]
khenaidood2b6df92018-12-13 16:37:20 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
21 a "github.com/golang/protobuf/ptypes/any"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
24 ic "github.com/opencord/voltha-go/protos/inter_container"
25 "github.com/opencord/voltha-go/protos/voltha"
26 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
khenaidoo54e0ddf2019-02-27 16:21:33 -050028 "sync"
khenaidood2b6df92018-12-13 16:37:20 -050029)
30
31type CoreProxy struct {
32 kafkaICProxy *kafka.InterContainerProxy
33 adapterTopic string
34 coreTopic string
khenaidoo54e0ddf2019-02-27 16:21:33 -050035 deviceIdCoreMap map[string]string
36 lockDeviceIdCoreMap sync.RWMutex
37
khenaidood2b6df92018-12-13 16:37:20 -050038}
39
40func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
41 var proxy CoreProxy
42 proxy.kafkaICProxy = kafkaProxy
43 proxy.adapterTopic = adapterTopic
44 proxy.coreTopic = coreTopic
khenaidoo54e0ddf2019-02-27 16:21:33 -050045 proxy.deviceIdCoreMap = make(map[string]string)
46 proxy.lockDeviceIdCoreMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -050047 log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
48
49 return &proxy
50}
51
52func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
53 if success {
54 return nil
55 } else {
56 unpackResult := &ic.Error{}
57 var err error
58 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
59 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
60 }
61 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
62 // TODO: Need to get the real error code
63 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
64 }
65}
66
khenaidoo54e0ddf2019-02-27 16:21:33 -050067// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
68func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
69 ap.lockDeviceIdCoreMap.Lock()
70 defer ap.lockDeviceIdCoreMap.Unlock()
71 ap.deviceIdCoreMap[deviceId] = coreReference
72}
73
74// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
75func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
76 ap.lockDeviceIdCoreMap.Lock()
77 defer ap.lockDeviceIdCoreMap.Unlock()
78 delete(ap.deviceIdCoreMap, deviceId)
79}
80
81func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
82 ap.lockDeviceIdCoreMap.Lock()
83 defer ap.lockDeviceIdCoreMap.Unlock()
84
85 if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
86 return kafka.Topic{Name: t}
87 }
88
89 return kafka.Topic{Name: ap.coreTopic}
90}
91
92func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
93 return kafka.Topic{Name: ap.adapterTopic}
94}
95
khenaidood2b6df92018-12-13 16:37:20 -050096func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
97 log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
98 rpc := "Register"
99 topic := kafka.Topic{Name: ap.coreTopic}
khenaidoo54e0ddf2019-02-27 16:21:33 -0500100 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500101 args := make([]*kafka.KVArg, 2)
102 args[0] = &kafka.KVArg{
103 Key: "adapter",
104 Value: adapter,
105 }
106 args[1] = &kafka.KVArg{
107 Key: "deviceTypes",
108 Value: deviceTypes,
109 }
110
111 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
112 log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
113 return unPackResponse(rpc, "", success, result)
114}
115
116func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
117 log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
118 rpc := "DeviceUpdate"
khenaidoo54e0ddf2019-02-27 16:21:33 -0500119 toTopic := ap.getCoreTopic(device.Id)
khenaidood2b6df92018-12-13 16:37:20 -0500120 args := make([]*kafka.KVArg, 1)
121 args[0] = &kafka.KVArg{
122 Key: "device",
123 Value: device,
124 }
125 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500126 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500127 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
128 log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
129 return unPackResponse(rpc, device.Id, success, result)
130}
131
132func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
133 log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
134 rpc := "PortCreated"
135 // Use a device specific topic to send the request. The adapter handling the device creates a device
136 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500137 toTopic := ap.getCoreTopic(deviceId)
khenaidood2b6df92018-12-13 16:37:20 -0500138 args := make([]*kafka.KVArg, 2)
139 id := &voltha.ID{Id: deviceId}
140 args[0] = &kafka.KVArg{
141 Key: "device_id",
142 Value: id,
143 }
144 args[1] = &kafka.KVArg{
145 Key: "port",
146 Value: port,
147 }
148
149 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500150 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500151 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
152 log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
153 return unPackResponse(rpc, deviceId, success, result)
154}
155
156func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
157 connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
158 log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
159 rpc := "DeviceStateUpdate"
160 // Use a device specific topic to send the request. The adapter handling the device creates a device
161 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500162 toTopic := ap.getCoreTopic(deviceId)
khenaidood2b6df92018-12-13 16:37:20 -0500163 args := make([]*kafka.KVArg, 3)
164 id := &voltha.ID{Id: deviceId}
165 oStatus := &ic.IntType{Val: int64(operStatus)}
166 cStatus := &ic.IntType{Val: int64(connStatus)}
167
168 args[0] = &kafka.KVArg{
169 Key: "device_id",
170 Value: id,
171 }
172 args[1] = &kafka.KVArg{
173 Key: "oper_status",
174 Value: oStatus,
175 }
176 args[2] = &kafka.KVArg{
177 Key: "connect_status",
178 Value: cStatus,
179 }
180 // Use a device specific topic as we are the only adaptercore handling requests for this device
khenaidoo54e0ddf2019-02-27 16:21:33 -0500181 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500182 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
183 log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
184 return unPackResponse(rpc, deviceId, success, result)
185}
186
187func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
188 childDeviceType string, channelId int) error {
189 log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
190 rpc := "ChildDeviceDetected"
191 // Use a device specific topic to send the request. The adapter handling the device creates a device
192 // specific topic
khenaidoo54e0ddf2019-02-27 16:21:33 -0500193 toTopic := ap.getCoreTopic(parentDeviceId)
194 replyToTopic := ap.getAdapterTopic()
khenaidood2b6df92018-12-13 16:37:20 -0500195
196 args := make([]*kafka.KVArg, 4)
197 id := &voltha.ID{Id: parentDeviceId}
198 args[0] = &kafka.KVArg{
199 Key: "parent_device_id",
200 Value: id,
201 }
202 ppn := &ic.IntType{Val: int64(parentPortNo)}
203 args[1] = &kafka.KVArg{
204 Key: "parent_port_no",
205 Value: ppn,
206 }
207 cdt := &ic.StrType{Val: childDeviceType}
208 args[2] = &kafka.KVArg{
209 Key: "child_device_type",
210 Value: cdt,
211 }
212 channel := &ic.IntType{Val: int64(channelId)}
213 args[3] = &kafka.KVArg{
214 Key: "channel_id",
215 Value: channel,
216 }
217
218 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
219 log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
220 return unPackResponse(rpc, parentDeviceId, success, result)
221}