blob: f07691028aeaa42b12abdeab4e0f772daa278d00 [file] [log] [blame]
khenaidood2b6df92018-12-13 16:37:20 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package common
17
18import (
19 "context"
20 "github.com/golang/protobuf/ptypes"
21 a "github.com/golang/protobuf/ptypes/any"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/kafka"
24 ic "github.com/opencord/voltha-go/protos/inter_container"
25 "github.com/opencord/voltha-go/protos/voltha"
26 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
28)
29
30type CoreProxy struct {
31 kafkaICProxy *kafka.InterContainerProxy
32 adapterTopic string
33 coreTopic string
34}
35
36func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
37 var proxy CoreProxy
38 proxy.kafkaICProxy = kafkaProxy
39 proxy.adapterTopic = adapterTopic
40 proxy.coreTopic = coreTopic
41 log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
42
43 return &proxy
44}
45
46func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
47 if success {
48 return nil
49 } else {
50 unpackResult := &ic.Error{}
51 var err error
52 if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
53 log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
54 }
55 log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
56 // TODO: Need to get the real error code
57 return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
58 }
59}
60
61func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
62 log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
63 rpc := "Register"
64 topic := kafka.Topic{Name: ap.coreTopic}
65 replyToTopic := kafka.Topic{Name: ap.adapterTopic}
66 args := make([]*kafka.KVArg, 2)
67 args[0] = &kafka.KVArg{
68 Key: "adapter",
69 Value: adapter,
70 }
71 args[1] = &kafka.KVArg{
72 Key: "deviceTypes",
73 Value: deviceTypes,
74 }
75
76 success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
77 log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
78 return unPackResponse(rpc, "", success, result)
79}
80
81func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
82 log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
83 rpc := "DeviceUpdate"
84 // Use a device specific topic to send the request. The adapter handling the device creates a device
85 // specific topic
86 toTopic := kafka.CreateSubTopic(ap.coreTopic, device.Id)
87 args := make([]*kafka.KVArg, 1)
88 args[0] = &kafka.KVArg{
89 Key: "device",
90 Value: device,
91 }
92 // Use a device specific topic as we are the only adaptercore handling requests for this device
93 replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, device.Id)
94 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
95 log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
96 return unPackResponse(rpc, device.Id, success, result)
97}
98
99func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
100 log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
101 rpc := "PortCreated"
102 // Use a device specific topic to send the request. The adapter handling the device creates a device
103 // specific topic
104 toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
105 args := make([]*kafka.KVArg, 2)
106 id := &voltha.ID{Id: deviceId}
107 args[0] = &kafka.KVArg{
108 Key: "device_id",
109 Value: id,
110 }
111 args[1] = &kafka.KVArg{
112 Key: "port",
113 Value: port,
114 }
115
116 // Use a device specific topic as we are the only adaptercore handling requests for this device
117 replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
118 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
119 log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
120 return unPackResponse(rpc, deviceId, success, result)
121}
122
123func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
124 connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
125 log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
126 rpc := "DeviceStateUpdate"
127 // Use a device specific topic to send the request. The adapter handling the device creates a device
128 // specific topic
129 toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
130 args := make([]*kafka.KVArg, 3)
131 id := &voltha.ID{Id: deviceId}
132 oStatus := &ic.IntType{Val: int64(operStatus)}
133 cStatus := &ic.IntType{Val: int64(connStatus)}
134
135 args[0] = &kafka.KVArg{
136 Key: "device_id",
137 Value: id,
138 }
139 args[1] = &kafka.KVArg{
140 Key: "oper_status",
141 Value: oStatus,
142 }
143 args[2] = &kafka.KVArg{
144 Key: "connect_status",
145 Value: cStatus,
146 }
147 // Use a device specific topic as we are the only adaptercore handling requests for this device
148 replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
149 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
150 log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
151 return unPackResponse(rpc, deviceId, success, result)
152}
153
154func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
155 childDeviceType string, channelId int) error {
156 log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
157 rpc := "ChildDeviceDetected"
158 // Use a device specific topic to send the request. The adapter handling the device creates a device
159 // specific topic
160 toTopic := kafka.CreateSubTopic(ap.coreTopic, parentDeviceId)
161 replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, parentDeviceId)
162
163 args := make([]*kafka.KVArg, 4)
164 id := &voltha.ID{Id: parentDeviceId}
165 args[0] = &kafka.KVArg{
166 Key: "parent_device_id",
167 Value: id,
168 }
169 ppn := &ic.IntType{Val: int64(parentPortNo)}
170 args[1] = &kafka.KVArg{
171 Key: "parent_port_no",
172 Value: ppn,
173 }
174 cdt := &ic.StrType{Val: childDeviceType}
175 args[2] = &kafka.KVArg{
176 Key: "child_device_type",
177 Value: cdt,
178 }
179 channel := &ic.IntType{Val: int64(channelId)}
180 args[3] = &kafka.KVArg{
181 Key: "channel_id",
182 Value: channel,
183 }
184
185 success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
186 log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
187 return unPackResponse(rpc, parentDeviceId, success, result)
188}