blob: 9771619adad1d4fd9d48c6dc636cfcba40c140ad [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
khenaidoob64fc8a2019-11-27 15:08:19 -050019 "context"
20 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080021 "strconv"
22 "time"
23
khenaidoob64fc8a2019-11-27 15:08:19 -050024 "github.com/golang/protobuf/ptypes/empty"
25 "github.com/google/uuid"
26 "github.com/opencord/voltha-go/rw_core/config"
27 cm "github.com/opencord/voltha-go/rw_core/mocks"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080028 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
29 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
30 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
31 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
32 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
34 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob64fc8a2019-11-27 15:08:19 -050035 "github.com/phayes/freeport"
36 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/status"
khenaidooab1f7bd2019-11-14 14:00:27 -050039)
40
41const (
khenaidoob64fc8a2019-11-27 15:08:19 -050042 volthaSerialNumberKey = "voltha_serial_number"
43 retryInterval = 50 * time.Millisecond
khenaidooab1f7bd2019-11-14 14:00:27 -050044)
45
khenaidoob64fc8a2019-11-27 15:08:19 -050046const (
47 OltAdapter = iota
48 OnuAdapter
49)
50
51var (
52 coreInCompeteMode bool
53)
54
55type isLogicalDeviceConditionSatisfied func(ld *voltha.LogicalDevice) bool
56type isDeviceConditionSatisfied func(ld *voltha.Device) bool
57type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
khenaidoo93d5a3d2020-01-15 12:37:05 -050058type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
khenaidoo67b22152020-03-02 16:01:25 -050059type isConditionSatisfied func() bool
khenaidoob64fc8a2019-11-27 15:08:19 -050060
khenaidooab1f7bd2019-11-14 14:00:27 -050061func init() {
khenaidoob64fc8a2019-11-27 15:08:19 -050062 //Default mode is two rw-core running in a pair of competing cores
63 coreInCompeteMode = true
64}
65
66func setCoreCompeteMode(mode bool) {
67 coreInCompeteMode = mode
68}
69
70func getContext() context.Context {
71 if coreInCompeteMode {
72 return metadata.NewIncomingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, uuid.New().String()))
73 }
74 return context.Background()
75}
76
77//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
78func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*lm.EtcdServer, int, error) {
79 kvClientPort, err := freeport.GetFreePort()
80 if err != nil {
81 return nil, 0, err
82 }
83 peerPort, err := freeport.GetFreePort()
84 if err != nil {
85 return nil, 0, err
86 }
87 etcdServer := lm.StartEtcdServer(lm.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
88 if etcdServer == nil {
89 return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
90 }
91 return etcdServer, kvClientPort, nil
92}
93
94func stopEmbeddedEtcdServer(server *lm.EtcdServer) {
95 if server != nil {
96 server.Stop()
97 }
98}
99
100func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
101 addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
102 client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout)
103 if err != nil {
104 panic("no kv client")
105 }
khenaidoob64fc8a2019-11-27 15:08:19 -0500106 return client
107}
108
109func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
110 var err error
111 var adapter adapters.IAdapter
npujar467fe752020-01-16 20:17:45 +0530112 adapterKafkaICProxy := kafka.NewInterContainerProxy(
khenaidoob64fc8a2019-11-27 15:08:19 -0500113 kafka.MsgClient(kafkaClient),
114 kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
khenaidoob64fc8a2019-11-27 15:08:19 -0500115 adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
116 var adapterReqHandler *com.RequestHandlerProxy
117 switch adapterType {
118 case OltAdapter:
119 adapter = cm.NewOLTAdapter(adapterCoreProxy)
120 case OnuAdapter:
121 adapter = cm.NewONUAdapter(adapterCoreProxy)
122 default:
Girish Kumarf56a4682020-03-20 20:07:46 +0000123 logger.Fatalf("invalid-adapter-type-%d", adapterType)
khenaidoob64fc8a2019-11-27 15:08:19 -0500124 }
125 adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
126
127 if err = adapterKafkaICProxy.Start(); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000128 logger.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
khenaidoob64fc8a2019-11-27 15:08:19 -0500129 return nil, err
130 }
131 if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000132 logger.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
khenaidoob64fc8a2019-11-27 15:08:19 -0500133 return nil, err
134 }
135 return adapter, nil
136}
137
138func waitUntilDeviceReadiness(deviceID string,
139 timeout time.Duration,
140 verificationFunction isDeviceConditionSatisfied,
141 nbi *APIHandler) error {
142 ch := make(chan int, 1)
143 done := false
144 go func() {
145 for {
146 device, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: deviceID})
Chaitrashree G Se8ad0202020-02-27 18:48:00 -0500147 if verificationFunction(device) {
khenaidoob64fc8a2019-11-27 15:08:19 -0500148 ch <- 1
149 break
150 }
151 if done {
152 break
153 }
154 time.Sleep(retryInterval)
155 }
156 }()
157 timer := time.NewTimer(timeout)
158 defer timer.Stop()
159 select {
160 case <-ch:
161 return nil
162 case <-timer.C:
163 done = true
164 return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
165 }
166}
167
168func waitUntilLogicalDeviceReadiness(oltDeviceID string,
169 timeout time.Duration,
170 nbi *APIHandler,
171 verificationFunction isLogicalDeviceConditionSatisfied,
172) error {
173 ch := make(chan int, 1)
174 done := false
175 go func() {
176 for {
177 // Get the logical device from the olt device
178 d, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDeviceID})
179 if d != nil && d.ParentId != "" {
180 ld, _ := nbi.GetLogicalDevice(getContext(), &voltha.ID{Id: d.ParentId})
khenaidoo67b22152020-03-02 16:01:25 -0500181 if verificationFunction(ld) {
khenaidoob64fc8a2019-11-27 15:08:19 -0500182 ch <- 1
183 break
184 }
185 if done {
186 break
187 }
Girish Gowdra408cd962020-03-11 14:31:31 -0700188 } else if d != nil && d.ParentId == "" { // case where logical device deleted
189 if verificationFunction(nil) {
190 ch <- 1
191 break
192 }
193 if done {
194 break
195 }
khenaidoob64fc8a2019-11-27 15:08:19 -0500196 }
197 time.Sleep(retryInterval)
198 }
199 }()
200 timer := time.NewTimer(timeout)
201 defer timer.Stop()
202 select {
203 case <-ch:
204 return nil
205 case <-timer.C:
206 done = true
207 return fmt.Errorf("timeout-waiting-for-logical-device-readiness%s", oltDeviceID)
208 }
209}
210
211func waitUntilConditionForDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isDevicesConditionSatisfied) error {
212 ch := make(chan int, 1)
213 done := false
214 go func() {
215 for {
216 devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
217 if verificationFunction(devices) {
218 ch <- 1
219 break
220 }
221 if done {
222 break
223 }
224
225 time.Sleep(retryInterval)
226 }
227 }()
228 timer := time.NewTimer(timeout)
229 defer timer.Stop()
230 select {
231 case <-ch:
232 return nil
233 case <-timer.C:
234 done = true
235 return fmt.Errorf("timeout-waiting-devices")
236 }
khenaidooab1f7bd2019-11-14 14:00:27 -0500237}
khenaidoo93d5a3d2020-01-15 12:37:05 -0500238
239func waitUntilConditionForLogicalDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isLogicalDevicesConditionSatisfied) error {
240 ch := make(chan int, 1)
241 done := false
242 go func() {
243 for {
244 lDevices, _ := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
245 if verificationFunction(lDevices) {
246 ch <- 1
247 break
248 }
249 if done {
250 break
251 }
252
253 time.Sleep(retryInterval)
254 }
255 }()
256 timer := time.NewTimer(timeout)
257 defer timer.Stop()
258 select {
259 case <-ch:
260 return nil
261 case <-timer.C:
262 done = true
263 return fmt.Errorf("timeout-waiting-logical-devices")
264 }
265}
khenaidoo67b22152020-03-02 16:01:25 -0500266
267func waitUntilCondition(timeout time.Duration, nbi *APIHandler, verificationFunction isConditionSatisfied) error {
268 ch := make(chan int, 1)
269 done := false
270 go func() {
271 for {
272 if verificationFunction() {
273 ch <- 1
274 break
275 }
276 if done {
277 break
278 }
279 time.Sleep(retryInterval)
280 }
281 }()
282 timer := time.NewTimer(timeout)
283 defer timer.Stop()
284 select {
285 case <-ch:
286 return nil
287 case <-timer.C:
288 done = true
289 return fmt.Errorf("timeout-waiting-for-condition")
290 }
291}