blob: 9098f0f145ddfe02e5ab8d4f5b70634f5e6a0ee3 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
khenaidoob64fc8a2019-11-27 15:08:19 -050019 "context"
20 "fmt"
21 "github.com/golang/protobuf/ptypes/empty"
22 "github.com/google/uuid"
23 "github.com/opencord/voltha-go/rw_core/config"
24 cm "github.com/opencord/voltha-go/rw_core/mocks"
25 "github.com/opencord/voltha-lib-go/v2/pkg/adapters"
26 com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
27 "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
28 "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
khenaidooab1f7bd2019-11-14 14:00:27 -050029 "github.com/opencord/voltha-lib-go/v2/pkg/log"
khenaidoob64fc8a2019-11-27 15:08:19 -050030 lm "github.com/opencord/voltha-lib-go/v2/pkg/mocks"
31 "github.com/opencord/voltha-protos/v2/go/voltha"
32 "github.com/phayes/freeport"
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/metadata"
35 "google.golang.org/grpc/status"
36 "strconv"
37 "time"
khenaidooab1f7bd2019-11-14 14:00:27 -050038)
39
40const (
khenaidoob64fc8a2019-11-27 15:08:19 -050041 logLevel = log.FatalLevel
42 volthaSerialNumberKey = "voltha_serial_number"
43 retryInterval = 50 * time.Millisecond
khenaidooab1f7bd2019-11-14 14:00:27 -050044)
45
khenaidoob64fc8a2019-11-27 15:08:19 -050046const (
47 OltAdapter = iota
48 OnuAdapter
49)
50
51var (
52 coreInCompeteMode bool
53)
54
55type isLogicalDeviceConditionSatisfied func(ld *voltha.LogicalDevice) bool
56type isDeviceConditionSatisfied func(ld *voltha.Device) bool
57type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
khenaidoo93d5a3d2020-01-15 12:37:05 -050058type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
khenaidoob64fc8a2019-11-27 15:08:19 -050059
khenaidooab1f7bd2019-11-14 14:00:27 -050060func init() {
khenaidoob64fc8a2019-11-27 15:08:19 -050061 _, err := log.AddPackage(log.JSON, logLevel, log.Fields{"instanceId": "coreTests"})
khenaidooab1f7bd2019-11-14 14:00:27 -050062 if err != nil {
63 panic(err)
64 }
khenaidoob64fc8a2019-11-27 15:08:19 -050065 // Update all loggers to log level specified as input parameter
66 log.SetAllLogLevel(log.ErrorLevel)
67
68 //Default mode is two rw-core running in a pair of competing cores
69 coreInCompeteMode = true
70}
71
72func setCoreCompeteMode(mode bool) {
73 coreInCompeteMode = mode
74}
75
76func getContext() context.Context {
77 if coreInCompeteMode {
78 return metadata.NewIncomingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, uuid.New().String()))
79 }
80 return context.Background()
81}
82
83//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
84func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*lm.EtcdServer, int, error) {
85 kvClientPort, err := freeport.GetFreePort()
86 if err != nil {
87 return nil, 0, err
88 }
89 peerPort, err := freeport.GetFreePort()
90 if err != nil {
91 return nil, 0, err
92 }
93 etcdServer := lm.StartEtcdServer(lm.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
94 if etcdServer == nil {
95 return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
96 }
97 return etcdServer, kvClientPort, nil
98}
99
100func stopEmbeddedEtcdServer(server *lm.EtcdServer) {
101 if server != nil {
102 server.Stop()
103 }
104}
105
106func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
107 addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
108 client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout)
109 if err != nil {
110 panic("no kv client")
111 }
112 // Setup KV transaction context
113 txnPrefix := cf.KVStoreDataPrefix + "/transactions/"
114 if err = SetTransactionContext(coreInstanceID,
115 txnPrefix,
116 client,
117 cf.KVStoreTimeout); err != nil {
118 log.Fatal("creating-transaction-context-failed")
119 }
120 return client
121}
122
123func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
124 var err error
125 var adapter adapters.IAdapter
126 adapterKafkaICProxy, err := kafka.NewInterContainerProxy(
127 kafka.MsgClient(kafkaClient),
128 kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
129 if err != nil || adapterKafkaICProxy == nil {
130 log.Errorw("Failure-creating-adapter-intercontainerProxy", log.Fields{"error": err, "adapter": adapterName})
131 return nil, err
132 }
133 adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
134 var adapterReqHandler *com.RequestHandlerProxy
135 switch adapterType {
136 case OltAdapter:
137 adapter = cm.NewOLTAdapter(adapterCoreProxy)
138 case OnuAdapter:
139 adapter = cm.NewONUAdapter(adapterCoreProxy)
140 default:
141 log.Fatalf("invalid-adapter-type-%d", adapterType)
142 }
143 adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
144
145 if err = adapterKafkaICProxy.Start(); err != nil {
146 log.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
147 return nil, err
148 }
149 if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
150 log.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
151 return nil, err
152 }
153 return adapter, nil
154}
155
156func waitUntilDeviceReadiness(deviceID string,
157 timeout time.Duration,
158 verificationFunction isDeviceConditionSatisfied,
159 nbi *APIHandler) error {
160 ch := make(chan int, 1)
161 done := false
162 go func() {
163 for {
164 device, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: deviceID})
165 if device != nil && verificationFunction(device) {
166 ch <- 1
167 break
168 }
169 if done {
170 break
171 }
172 time.Sleep(retryInterval)
173 }
174 }()
175 timer := time.NewTimer(timeout)
176 defer timer.Stop()
177 select {
178 case <-ch:
179 return nil
180 case <-timer.C:
181 done = true
182 return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
183 }
184}
185
186func waitUntilLogicalDeviceReadiness(oltDeviceID string,
187 timeout time.Duration,
188 nbi *APIHandler,
189 verificationFunction isLogicalDeviceConditionSatisfied,
190) error {
191 ch := make(chan int, 1)
192 done := false
193 go func() {
194 for {
195 // Get the logical device from the olt device
196 d, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDeviceID})
197 if d != nil && d.ParentId != "" {
198 ld, _ := nbi.GetLogicalDevice(getContext(), &voltha.ID{Id: d.ParentId})
199 if ld != nil && verificationFunction(ld) {
200 ch <- 1
201 break
202 }
203 if done {
204 break
205 }
206 }
207 time.Sleep(retryInterval)
208 }
209 }()
210 timer := time.NewTimer(timeout)
211 defer timer.Stop()
212 select {
213 case <-ch:
214 return nil
215 case <-timer.C:
216 done = true
217 return fmt.Errorf("timeout-waiting-for-logical-device-readiness%s", oltDeviceID)
218 }
219}
220
221func waitUntilConditionForDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isDevicesConditionSatisfied) error {
222 ch := make(chan int, 1)
223 done := false
224 go func() {
225 for {
226 devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
227 if verificationFunction(devices) {
228 ch <- 1
229 break
230 }
231 if done {
232 break
233 }
234
235 time.Sleep(retryInterval)
236 }
237 }()
238 timer := time.NewTimer(timeout)
239 defer timer.Stop()
240 select {
241 case <-ch:
242 return nil
243 case <-timer.C:
244 done = true
245 return fmt.Errorf("timeout-waiting-devices")
246 }
khenaidooab1f7bd2019-11-14 14:00:27 -0500247}
khenaidoo93d5a3d2020-01-15 12:37:05 -0500248
249func waitUntilConditionForLogicalDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isLogicalDevicesConditionSatisfied) error {
250 ch := make(chan int, 1)
251 done := false
252 go func() {
253 for {
254 lDevices, _ := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
255 if verificationFunction(lDevices) {
256 ch <- 1
257 break
258 }
259 if done {
260 break
261 }
262
263 time.Sleep(retryInterval)
264 }
265 }()
266 timer := time.NewTimer(timeout)
267 defer timer.Stop()
268 select {
269 case <-ch:
270 return nil
271 case <-timer.C:
272 done = true
273 return fmt.Errorf("timeout-waiting-logical-devices")
274 }
275}