blob: 7f6412de8958ead65a386664b6599fbd8076dc6e [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
khenaidoob64fc8a2019-11-27 15:08:19 -050019 "context"
20 "fmt"
21 "github.com/golang/protobuf/ptypes/empty"
22 "github.com/google/uuid"
23 "github.com/opencord/voltha-go/rw_core/config"
24 cm "github.com/opencord/voltha-go/rw_core/mocks"
25 "github.com/opencord/voltha-lib-go/v2/pkg/adapters"
26 com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
27 "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
28 "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
khenaidooab1f7bd2019-11-14 14:00:27 -050029 "github.com/opencord/voltha-lib-go/v2/pkg/log"
khenaidoob64fc8a2019-11-27 15:08:19 -050030 lm "github.com/opencord/voltha-lib-go/v2/pkg/mocks"
31 "github.com/opencord/voltha-protos/v2/go/voltha"
32 "github.com/phayes/freeport"
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/metadata"
35 "google.golang.org/grpc/status"
36 "strconv"
37 "time"
khenaidooab1f7bd2019-11-14 14:00:27 -050038)
39
40const (
khenaidoob64fc8a2019-11-27 15:08:19 -050041 logLevel = log.FatalLevel
42 volthaSerialNumberKey = "voltha_serial_number"
43 retryInterval = 50 * time.Millisecond
khenaidooab1f7bd2019-11-14 14:00:27 -050044)
45
khenaidoob64fc8a2019-11-27 15:08:19 -050046const (
47 OltAdapter = iota
48 OnuAdapter
49)
50
51var (
52 coreInCompeteMode bool
53)
54
55type isLogicalDeviceConditionSatisfied func(ld *voltha.LogicalDevice) bool
56type isDeviceConditionSatisfied func(ld *voltha.Device) bool
57type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
58
khenaidooab1f7bd2019-11-14 14:00:27 -050059func init() {
khenaidoob64fc8a2019-11-27 15:08:19 -050060 _, err := log.AddPackage(log.JSON, logLevel, log.Fields{"instanceId": "coreTests"})
khenaidooab1f7bd2019-11-14 14:00:27 -050061 if err != nil {
62 panic(err)
63 }
khenaidoob64fc8a2019-11-27 15:08:19 -050064 // Update all loggers to log level specified as input parameter
65 log.SetAllLogLevel(log.ErrorLevel)
66
67 //Default mode is two rw-core running in a pair of competing cores
68 coreInCompeteMode = true
69}
70
71func setCoreCompeteMode(mode bool) {
72 coreInCompeteMode = mode
73}
74
75func getContext() context.Context {
76 if coreInCompeteMode {
77 return metadata.NewIncomingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, uuid.New().String()))
78 }
79 return context.Background()
80}
81
82//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
83func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*lm.EtcdServer, int, error) {
84 kvClientPort, err := freeport.GetFreePort()
85 if err != nil {
86 return nil, 0, err
87 }
88 peerPort, err := freeport.GetFreePort()
89 if err != nil {
90 return nil, 0, err
91 }
92 etcdServer := lm.StartEtcdServer(lm.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
93 if etcdServer == nil {
94 return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
95 }
96 return etcdServer, kvClientPort, nil
97}
98
99func stopEmbeddedEtcdServer(server *lm.EtcdServer) {
100 if server != nil {
101 server.Stop()
102 }
103}
104
105func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
106 addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
107 client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout)
108 if err != nil {
109 panic("no kv client")
110 }
111 // Setup KV transaction context
112 txnPrefix := cf.KVStoreDataPrefix + "/transactions/"
113 if err = SetTransactionContext(coreInstanceID,
114 txnPrefix,
115 client,
116 cf.KVStoreTimeout); err != nil {
117 log.Fatal("creating-transaction-context-failed")
118 }
119 return client
120}
121
122func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
123 var err error
124 var adapter adapters.IAdapter
125 adapterKafkaICProxy, err := kafka.NewInterContainerProxy(
126 kafka.MsgClient(kafkaClient),
127 kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
128 if err != nil || adapterKafkaICProxy == nil {
129 log.Errorw("Failure-creating-adapter-intercontainerProxy", log.Fields{"error": err, "adapter": adapterName})
130 return nil, err
131 }
132 adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
133 var adapterReqHandler *com.RequestHandlerProxy
134 switch adapterType {
135 case OltAdapter:
136 adapter = cm.NewOLTAdapter(adapterCoreProxy)
137 case OnuAdapter:
138 adapter = cm.NewONUAdapter(adapterCoreProxy)
139 default:
140 log.Fatalf("invalid-adapter-type-%d", adapterType)
141 }
142 adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
143
144 if err = adapterKafkaICProxy.Start(); err != nil {
145 log.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
146 return nil, err
147 }
148 if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
149 log.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
150 return nil, err
151 }
152 return adapter, nil
153}
154
155func waitUntilDeviceReadiness(deviceID string,
156 timeout time.Duration,
157 verificationFunction isDeviceConditionSatisfied,
158 nbi *APIHandler) error {
159 ch := make(chan int, 1)
160 done := false
161 go func() {
162 for {
163 device, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: deviceID})
164 if device != nil && verificationFunction(device) {
165 ch <- 1
166 break
167 }
168 if done {
169 break
170 }
171 time.Sleep(retryInterval)
172 }
173 }()
174 timer := time.NewTimer(timeout)
175 defer timer.Stop()
176 select {
177 case <-ch:
178 return nil
179 case <-timer.C:
180 done = true
181 return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
182 }
183}
184
185func waitUntilLogicalDeviceReadiness(oltDeviceID string,
186 timeout time.Duration,
187 nbi *APIHandler,
188 verificationFunction isLogicalDeviceConditionSatisfied,
189) error {
190 ch := make(chan int, 1)
191 done := false
192 go func() {
193 for {
194 // Get the logical device from the olt device
195 d, _ := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDeviceID})
196 if d != nil && d.ParentId != "" {
197 ld, _ := nbi.GetLogicalDevice(getContext(), &voltha.ID{Id: d.ParentId})
198 if ld != nil && verificationFunction(ld) {
199 ch <- 1
200 break
201 }
202 if done {
203 break
204 }
205 }
206 time.Sleep(retryInterval)
207 }
208 }()
209 timer := time.NewTimer(timeout)
210 defer timer.Stop()
211 select {
212 case <-ch:
213 return nil
214 case <-timer.C:
215 done = true
216 return fmt.Errorf("timeout-waiting-for-logical-device-readiness%s", oltDeviceID)
217 }
218}
219
220func waitUntilConditionForDevices(timeout time.Duration, nbi *APIHandler, verificationFunction isDevicesConditionSatisfied) error {
221 ch := make(chan int, 1)
222 done := false
223 go func() {
224 for {
225 devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
226 if verificationFunction(devices) {
227 ch <- 1
228 break
229 }
230 if done {
231 break
232 }
233
234 time.Sleep(retryInterval)
235 }
236 }()
237 timer := time.NewTimer(timeout)
238 defer timer.Stop()
239 select {
240 case <-ch:
241 return nil
242 case <-timer.C:
243 done = true
244 return fmt.Errorf("timeout-waiting-devices")
245 }
khenaidooab1f7bd2019-11-14 14:00:27 -0500246}