blob: 1128cde5619adebd0540c8f31cb14583ad25e641 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
khenaidoob64fc8a2019-11-27 15:08:19 -050019 "context"
20 "fmt"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080021 "strconv"
22 "time"
23
khenaidoob64fc8a2019-11-27 15:08:19 -050024 "github.com/golang/protobuf/ptypes/empty"
25 "github.com/google/uuid"
26 "github.com/opencord/voltha-go/rw_core/config"
27 cm "github.com/opencord/voltha-go/rw_core/mocks"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080028 "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
29 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
30 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
31 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
32 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
34 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoob64fc8a2019-11-27 15:08:19 -050035 "github.com/phayes/freeport"
36 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/status"
khenaidooab1f7bd2019-11-14 14:00:27 -050039)
40
41const (
khenaidoob64fc8a2019-11-27 15:08:19 -050042 logLevel = log.FatalLevel
43 volthaSerialNumberKey = "voltha_serial_number"
44 retryInterval = 50 * time.Millisecond
khenaidooab1f7bd2019-11-14 14:00:27 -050045)
46
khenaidoob64fc8a2019-11-27 15:08:19 -050047const (
48 OltAdapter = iota
49 OnuAdapter
50)
51
52var (
53 coreInCompeteMode bool
54)
55
56type isLogicalDeviceConditionSatisfied func(ld *voltha.LogicalDevice) bool
57type isDeviceConditionSatisfied func(ld *voltha.Device) bool
58type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
khenaidoo93d5a3d2020-01-15 12:37:05 -050059type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
khenaidoob64fc8a2019-11-27 15:08:19 -050060
khenaidooab1f7bd2019-11-14 14:00:27 -050061func init() {
khenaidoob64fc8a2019-11-27 15:08:19 -050062 _, err := log.AddPackage(log.JSON, logLevel, log.Fields{"instanceId": "coreTests"})
khenaidooab1f7bd2019-11-14 14:00:27 -050063 if err != nil {
64 panic(err)
65 }
khenaidoob64fc8a2019-11-27 15:08:19 -050066 // Update all loggers to log level specified as input parameter
67 log.SetAllLogLevel(log.ErrorLevel)
68
69 //Default mode is two rw-core running in a pair of competing cores
70 coreInCompeteMode = true
71}
72
73func setCoreCompeteMode(mode bool) {
74 coreInCompeteMode = mode
75}
76
77func getContext() context.Context {
78 if coreInCompeteMode {
79 return metadata.NewIncomingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, uuid.New().String()))
80 }
81 return context.Background()
82}
83
84//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
85func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*lm.EtcdServer, int, error) {
86 kvClientPort, err := freeport.GetFreePort()
87 if err != nil {
88 return nil, 0, err
89 }
90 peerPort, err := freeport.GetFreePort()
91 if err != nil {
92 return nil, 0, err
93 }
94 etcdServer := lm.StartEtcdServer(lm.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
95 if etcdServer == nil {
96 return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
97 }
98 return etcdServer, kvClientPort, nil
99}
100
101func stopEmbeddedEtcdServer(server *lm.EtcdServer) {
102 if server != nil {
103 server.Stop()
104 }
105}
106
107func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
108 addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
109 client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout)
110 if err != nil {
111 panic("no kv client")
112 }
113 // Setup KV transaction context
114 txnPrefix := cf.KVStoreDataPrefix + "/transactions/"
115 if err = SetTransactionContext(coreInstanceID,
116 txnPrefix,
117 client,
118 cf.KVStoreTimeout); err != nil {
119 log.Fatal("creating-transaction-context-failed")
120 }
121 return client
122}
123
124func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
125 var err error
126 var adapter adapters.IAdapter
npujar467fe752020-01-16 20:17:45 +0530127 adapterKafkaICProxy := kafka.NewInterContainerProxy(
khenaidoob64fc8a2019-11-27 15:08:19 -0500128 kafka.MsgClient(kafkaClient),
129 kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
khenaidoob64fc8a2019-11-27 15:08:19 -0500130 adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
131 var adapterReqHandler *com.RequestHandlerProxy
132 switch adapterType {
133 case OltAdapter:
134 adapter = cm.NewOLTAdapter(adapterCoreProxy)
135 case OnuAdapter:
136 adapter = cm.NewONUAdapter(adapterCoreProxy)
137 default:
138 log.Fatalf("invalid-adapter-type-%d", adapterType)
139 }
140 adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
141
142 if err = adapterKafkaICProxy.Start(); err != nil {
143 log.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
144 return nil, err
145 }
146 if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
147 log.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
148 return nil, err
149 }
150 return adapter, nil
151}
152
153func waitUntilDeviceReadiness(deviceID string,
154 timeout time.Duration,
155 verificationFunction isDeviceConditionSatisfied,
156 nbi *APIHandler) error {
157 ch := make(chan int, 1)
158 done := false
159 go func() {
160 for {
161 device, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: deviceID})
162 if device != nil && verificationFunction(device) {
163 ch <- 1
164 break
165 }
166 if done {
167 break
168 }
169 time.Sleep(retryInterval)
170 }
171 }()
172 timer := time.NewTimer(timeout)
173 defer timer.Stop()
174 select {
175 case <-ch:
176 return nil
177 case <-timer.C:
178 done = true
179 return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
180 }
181}
182
183func waitUntilLogicalDeviceReadiness(oltDeviceID string,
184 timeout time.Duration,
185 nbi *APIHandler,
186 verificationFunction isLogicalDeviceConditionSatisfied,
187) error {
188 ch := make(chan int, 1)
189 done := false
190 go func() {
191 for {
192 // Get the logical device from the olt device
193 d, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDeviceID})
194 if d != nil && d.ParentId != "" {
195 ld, _ := nbi.GetLogicalDevice(getContext(), &voltha.ID{Id: d.ParentId})
196 if ld != nil && verificationFunction(ld) {
197 ch <- 1
198 break
199 }
200 if done {
201 break
202 }
203 }
204 time.Sleep(retryInterval)
205 }
206 }()
207 timer := time.NewTimer(timeout)
208 defer timer.Stop()
209 select {
210 case <-ch:
211 return nil
212 case <-timer.C:
213 done = true
214 return fmt.Errorf("timeout-waiting-for-logical-device-readiness%s", oltDeviceID)
215 }
216}
217
218func waitUntilConditionForDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isDevicesConditionSatisfied) error {
219 ch := make(chan int, 1)
220 done := false
221 go func() {
222 for {
223 devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
224 if verificationFunction(devices) {
225 ch <- 1
226 break
227 }
228 if done {
229 break
230 }
231
232 time.Sleep(retryInterval)
233 }
234 }()
235 timer := time.NewTimer(timeout)
236 defer timer.Stop()
237 select {
238 case <-ch:
239 return nil
240 case <-timer.C:
241 done = true
242 return fmt.Errorf("timeout-waiting-devices")
243 }
khenaidooab1f7bd2019-11-14 14:00:27 -0500244}
khenaidoo93d5a3d2020-01-15 12:37:05 -0500245
246func waitUntilConditionForLogicalDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isLogicalDevicesConditionSatisfied) error {
247 ch := make(chan int, 1)
248 done := false
249 go func() {
250 for {
251 lDevices, _ := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
252 if verificationFunction(lDevices) {
253 ch <- 1
254 break
255 }
256 if done {
257 break
258 }
259
260 time.Sleep(retryInterval)
261 }
262 }()
263 timer := time.NewTimer(timeout)
264 defer timer.Stop()
265 select {
266 case <-ch:
267 return nil
268 case <-timer.C:
269 done = true
270 return fmt.Errorf("timeout-waiting-logical-devices")
271 }
272}