blob: 8375f70ce049de184251ff6ffbfa7031a6a4d443 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
khenaidoob64fc8a2019-11-27 15:08:19 -050019 "context"
20 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080021 "strconv"
22 "time"
23
khenaidoob64fc8a2019-11-27 15:08:19 -050024 "github.com/golang/protobuf/ptypes/empty"
25 "github.com/google/uuid"
26 "github.com/opencord/voltha-go/rw_core/config"
27 cm "github.com/opencord/voltha-go/rw_core/mocks"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080028 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
29 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
30 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
31 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
32 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
34 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob64fc8a2019-11-27 15:08:19 -050035 "github.com/phayes/freeport"
36 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/status"
khenaidooab1f7bd2019-11-14 14:00:27 -050039)
40
41const (
khenaidoob64fc8a2019-11-27 15:08:19 -050042 logLevel = log.FatalLevel
43 volthaSerialNumberKey = "voltha_serial_number"
44 retryInterval = 50 * time.Millisecond
khenaidooab1f7bd2019-11-14 14:00:27 -050045)
46
khenaidoob64fc8a2019-11-27 15:08:19 -050047const (
48 OltAdapter = iota
49 OnuAdapter
50)
51
52var (
53 coreInCompeteMode bool
54)
55
56type isLogicalDeviceConditionSatisfied func(ld *voltha.LogicalDevice) bool
57type isDeviceConditionSatisfied func(ld *voltha.Device) bool
58type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
khenaidoo93d5a3d2020-01-15 12:37:05 -050059type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
khenaidoo67b22152020-03-02 16:01:25 -050060type isConditionSatisfied func() bool
khenaidoob64fc8a2019-11-27 15:08:19 -050061
khenaidooab1f7bd2019-11-14 14:00:27 -050062func init() {
khenaidoob64fc8a2019-11-27 15:08:19 -050063 _, err := log.AddPackage(log.JSON, logLevel, log.Fields{"instanceId": "coreTests"})
khenaidooab1f7bd2019-11-14 14:00:27 -050064 if err != nil {
65 panic(err)
66 }
khenaidoob64fc8a2019-11-27 15:08:19 -050067 // Update all loggers to log level specified as input parameter
68 log.SetAllLogLevel(log.ErrorLevel)
69
70 //Default mode is two rw-core running in a pair of competing cores
71 coreInCompeteMode = true
72}
73
74func setCoreCompeteMode(mode bool) {
75 coreInCompeteMode = mode
76}
77
78func getContext() context.Context {
79 if coreInCompeteMode {
80 return metadata.NewIncomingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, uuid.New().String()))
81 }
82 return context.Background()
83}
84
85//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
86func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*lm.EtcdServer, int, error) {
87 kvClientPort, err := freeport.GetFreePort()
88 if err != nil {
89 return nil, 0, err
90 }
91 peerPort, err := freeport.GetFreePort()
92 if err != nil {
93 return nil, 0, err
94 }
95 etcdServer := lm.StartEtcdServer(lm.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
96 if etcdServer == nil {
97 return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
98 }
99 return etcdServer, kvClientPort, nil
100}
101
102func stopEmbeddedEtcdServer(server *lm.EtcdServer) {
103 if server != nil {
104 server.Stop()
105 }
106}
107
108func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
109 addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
110 client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout)
111 if err != nil {
112 panic("no kv client")
113 }
114 // Setup KV transaction context
115 txnPrefix := cf.KVStoreDataPrefix + "/transactions/"
116 if err = SetTransactionContext(coreInstanceID,
117 txnPrefix,
118 client,
119 cf.KVStoreTimeout); err != nil {
120 log.Fatal("creating-transaction-context-failed")
121 }
122 return client
123}
124
125func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
126 var err error
127 var adapter adapters.IAdapter
npujar467fe752020-01-16 20:17:45 +0530128 adapterKafkaICProxy := kafka.NewInterContainerProxy(
khenaidoob64fc8a2019-11-27 15:08:19 -0500129 kafka.MsgClient(kafkaClient),
130 kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
khenaidoob64fc8a2019-11-27 15:08:19 -0500131 adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
132 var adapterReqHandler *com.RequestHandlerProxy
133 switch adapterType {
134 case OltAdapter:
135 adapter = cm.NewOLTAdapter(adapterCoreProxy)
136 case OnuAdapter:
137 adapter = cm.NewONUAdapter(adapterCoreProxy)
138 default:
139 log.Fatalf("invalid-adapter-type-%d", adapterType)
140 }
141 adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
142
143 if err = adapterKafkaICProxy.Start(); err != nil {
144 log.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
145 return nil, err
146 }
147 if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
148 log.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
149 return nil, err
150 }
151 return adapter, nil
152}
153
154func waitUntilDeviceReadiness(deviceID string,
155 timeout time.Duration,
156 verificationFunction isDeviceConditionSatisfied,
157 nbi *APIHandler) error {
158 ch := make(chan int, 1)
159 done := false
160 go func() {
161 for {
162 device, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: deviceID})
Chaitrashree G Se8ad0202020-02-27 18:48:00 -0500163 if verificationFunction(device) {
khenaidoob64fc8a2019-11-27 15:08:19 -0500164 ch <- 1
165 break
166 }
167 if done {
168 break
169 }
170 time.Sleep(retryInterval)
171 }
172 }()
173 timer := time.NewTimer(timeout)
174 defer timer.Stop()
175 select {
176 case <-ch:
177 return nil
178 case <-timer.C:
179 done = true
180 return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
181 }
182}
183
184func waitUntilLogicalDeviceReadiness(oltDeviceID string,
185 timeout time.Duration,
186 nbi *APIHandler,
187 verificationFunction isLogicalDeviceConditionSatisfied,
188) error {
189 ch := make(chan int, 1)
190 done := false
191 go func() {
192 for {
193 // Get the logical device from the olt device
194 d, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDeviceID})
195 if d != nil && d.ParentId != "" {
196 ld, _ := nbi.GetLogicalDevice(getContext(), &voltha.ID{Id: d.ParentId})
khenaidoo67b22152020-03-02 16:01:25 -0500197 if verificationFunction(ld) {
khenaidoob64fc8a2019-11-27 15:08:19 -0500198 ch <- 1
199 break
200 }
201 if done {
202 break
203 }
Girish Gowdra408cd962020-03-11 14:31:31 -0700204 } else if d != nil && d.ParentId == "" { // case where logical device deleted
205 if verificationFunction(nil) {
206 ch <- 1
207 break
208 }
209 if done {
210 break
211 }
khenaidoob64fc8a2019-11-27 15:08:19 -0500212 }
213 time.Sleep(retryInterval)
214 }
215 }()
216 timer := time.NewTimer(timeout)
217 defer timer.Stop()
218 select {
219 case <-ch:
220 return nil
221 case <-timer.C:
222 done = true
223 return fmt.Errorf("timeout-waiting-for-logical-device-readiness%s", oltDeviceID)
224 }
225}
226
227func waitUntilConditionForDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isDevicesConditionSatisfied) error {
228 ch := make(chan int, 1)
229 done := false
230 go func() {
231 for {
232 devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
233 if verificationFunction(devices) {
234 ch <- 1
235 break
236 }
237 if done {
238 break
239 }
240
241 time.Sleep(retryInterval)
242 }
243 }()
244 timer := time.NewTimer(timeout)
245 defer timer.Stop()
246 select {
247 case <-ch:
248 return nil
249 case <-timer.C:
250 done = true
251 return fmt.Errorf("timeout-waiting-devices")
252 }
khenaidooab1f7bd2019-11-14 14:00:27 -0500253}
khenaidoo93d5a3d2020-01-15 12:37:05 -0500254
255func waitUntilConditionForLogicalDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isLogicalDevicesConditionSatisfied) error {
256 ch := make(chan int, 1)
257 done := false
258 go func() {
259 for {
260 lDevices, _ := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
261 if verificationFunction(lDevices) {
262 ch <- 1
263 break
264 }
265 if done {
266 break
267 }
268
269 time.Sleep(retryInterval)
270 }
271 }()
272 timer := time.NewTimer(timeout)
273 defer timer.Stop()
274 select {
275 case <-ch:
276 return nil
277 case <-timer.C:
278 done = true
279 return fmt.Errorf("timeout-waiting-logical-devices")
280 }
281}
khenaidoo67b22152020-03-02 16:01:25 -0500282
283func waitUntilCondition(timeout time.Duration, nbi *APIHandler, verificationFunction isConditionSatisfied) error {
284 ch := make(chan int, 1)
285 done := false
286 go func() {
287 for {
288 if verificationFunction() {
289 ch <- 1
290 break
291 }
292 if done {
293 break
294 }
295 time.Sleep(retryInterval)
296 }
297 }()
298 timer := time.NewTimer(timeout)
299 defer timer.Stop()
300 select {
301 case <-ch:
302 return nil
303 case <-timer.C:
304 done = true
305 return fmt.Errorf("timeout-waiting-for-condition")
306 }
307}