blob: 87d4e82b1940bd65f866de95961790d78aea3e1e [file] [log] [blame]
Kent Hagerman0ab4cb22019-04-24 13:13:35 -04001// +build integration
2
khenaidoo297cd252019-02-07 22:10:23 -05003/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package concurrency
19
20import (
21 "context"
22 "errors"
23 "fmt"
npujar12732342019-11-14 17:28:40 +053024 "os"
25 "os/exec"
26 "strings"
27 "testing"
28
khenaidoo297cd252019-02-07 22:10:23 -050029 "github.com/golang/protobuf/ptypes/empty"
30 "github.com/google/uuid"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080031 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
32 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 "github.com/opencord/voltha-protos/v3/go/common"
34 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoo297cd252019-02-07 22:10:23 -050035 "github.com/stretchr/testify/assert"
36 "google.golang.org/grpc"
37 "google.golang.org/grpc/metadata"
khenaidoo297cd252019-02-07 22:10:23 -050038)
39
40var conns []*grpc.ClientConn
41var stubs []voltha.VolthaServiceClient
42var volthaSerialNumberKey string
43var grpcPorts []int
44
45/*
46 This series of tests are executed with two RW_Cores
47*/
48
49var devices map[string]*voltha.Device
50
51func setup() {
khenaidoo297cd252019-02-07 22:10:23 -050052 grpcPorts = []int{50057, 50058}
53 stubs = make([]voltha.VolthaServiceClient, 0)
54 conns = make([]*grpc.ClientConn, 0)
55
56 volthaSerialNumberKey = "voltha_serial_number"
57 devices = make(map[string]*voltha.Device)
58}
59
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040060func connectToCore(port int) (voltha.VolthaServiceClient, error) {
khenaidoo297cd252019-02-07 22:10:23 -050061 grpcHostIP := os.Getenv("DOCKER_HOST_IP")
62 grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, port)
63 conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
64 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +000065 logger.Fatalf(ctx, "did not connect: %s", err)
khenaidoo297cd252019-02-07 22:10:23 -050066 return nil, errors.New("failure-to-connect")
67 }
68 conns = append(conns, conn)
69 return voltha.NewVolthaServiceClient(conn), nil
70}
71
72func setupGrpcConnection() []voltha.VolthaServiceClient {
73 // We have 2 concurrent cores. Connect to them
74 for _, port := range grpcPorts {
75 if client, err := connectToCore(port); err == nil {
76 stubs = append(stubs, client)
Rohan Agrawal31f21802020-06-12 05:38:46 +000077 logger.Infow(ctx, "connected", log.Fields{"port": port})
khenaidoo297cd252019-02-07 22:10:23 -050078 }
79 }
80 return stubs
81}
82
83func clearAllDevices(clearMap bool) {
84 for key, _ := range devices {
85 ctx := context.Background()
86 response, err := stubs[1].DeleteDevice(ctx, &voltha.ID{Id: key})
Rohan Agrawal31f21802020-06-12 05:38:46 +000087 logger.Infow(ctx, "response", log.Fields{"res": response, "error": err})
khenaidoo297cd252019-02-07 22:10:23 -050088 if clearMap {
89 delete(devices, key)
90 }
91 }
92}
93
94// Verify if all ids are present in the global list of devices
95func hasAllIds(ids *voltha.IDs) bool {
96 if ids == nil && len(devices) == 0 {
97 return true
98 }
99 if ids == nil {
100 return false
101 }
102 for _, id := range ids.Items {
103 if _, exist := devices[id.Id]; !exist {
104 return false
105 }
106 }
107 return true
108}
109
110func startKafka() {
111 fmt.Println("Starting Kafka and Etcd ...")
112 command := "docker-compose"
113 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
114 if err := cmd.Run(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000115 logger.Fatal(ctx, err)
khenaidoo297cd252019-02-07 22:10:23 -0500116 }
117}
118
119func startEtcd() {
120 fmt.Println("Starting Etcd ...")
121 command := "docker-compose"
122 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "up", "-d")
123 if err := cmd.Run(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000124 logger.Fatal(ctx, err)
khenaidoo297cd252019-02-07 22:10:23 -0500125 }
126}
127
128func stopKafka() {
129 fmt.Println("Stopping Kafka and Etcd ...")
130 command := "docker-compose"
131 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "down")
132 if err := cmd.Run(); err != nil {
133 // ignore error - as this is mostly due network being left behind as its being used by other
134 // containers
Rohan Agrawal31f21802020-06-12 05:38:46 +0000135 logger.Warn(ctx, err)
khenaidoo297cd252019-02-07 22:10:23 -0500136 }
137}
138
139func stopEtcd() {
140 fmt.Println("Stopping Etcd ...")
141 command := "docker-compose"
142 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "down")
143 if err := cmd.Run(); err != nil {
144 // ignore error - as this is mostly due network being left behind as its being used by other
145 // containers
Rohan Agrawal31f21802020-06-12 05:38:46 +0000146 logger.Warn(ctx, err)
khenaidoo297cd252019-02-07 22:10:23 -0500147 }
148}
149
150func startCores() {
151 fmt.Println("Starting voltha cores ...")
152 command := "docker-compose"
153 cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "up", "-d")
154 if err := cmd.Run(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000155 logger.Fatal(ctx, err)
khenaidoo297cd252019-02-07 22:10:23 -0500156 }
157}
158
159func stopCores() {
160 fmt.Println("Stopping voltha cores ...")
161 command := "docker-compose"
162 cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "down")
163 if err := cmd.Run(); err != nil {
164 // ignore error - as this is mostly due network being left behind as its being used by other
165 // containers
Rohan Agrawal31f21802020-06-12 05:38:46 +0000166 logger.Warn(ctx, err)
khenaidoo297cd252019-02-07 22:10:23 -0500167 }
168}
169
170func startSimulatedOLTAndONUAdapters() {
171 fmt.Println("Starting simulated OLT and ONU adapters ...")
172 command := "docker-compose"
173 cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "up", "-d")
174 if err := cmd.Run(); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000175 logger.Fatal(ctx, err)
khenaidoo297cd252019-02-07 22:10:23 -0500176 }
177}
178
179func stopSimulatedOLTAndONUAdapters() {
180 fmt.Println("Stopping simulated OLT and ONU adapters ...")
181 command := "docker-compose"
182 cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "down")
183 if err := cmd.Run(); err != nil {
184 // ignore error - as this is mostly due network being left behind as its being used by other
185 // containers
Rohan Agrawal31f21802020-06-12 05:38:46 +0000186 logger.Warn(ctx, err)
khenaidoo297cd252019-02-07 22:10:23 -0500187 }
188}
189
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400190func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{}) {
khenaidoo297cd252019-02-07 22:10:23 -0500191 fmt.Println("Sending create device ...")
192 if response, err := stub.CreateDevice(ctx, device); err != nil {
193 ch <- err
194 } else {
195 ch <- response
196 }
197}
198
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400199func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{}) {
khenaidoo297cd252019-02-07 22:10:23 -0500200 fmt.Println("Sending enable device ...")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400201 if response, err := stub.EnableDevice(ctx, &common.ID{Id: deviceId}); err != nil {
khenaidoo297cd252019-02-07 22:10:23 -0500202 ch <- err
203 } else {
204 ch <- response
205 }
206}
207
208//// createPonsimDevice sends two requests to each core and waits for both responses
209//func createPonsimDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
210// ui := uuid.New()
211// ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
212// //preprovision_olt -t ponsim_olt -H 172.20.0.11:50060
213// device := &voltha.Device{Type: "ponsim_olt"}
214// device.Address = &voltha.Device_HostAndPort{HostAndPort:"172.20.0.11:50060"}
215// ch := make(chan interface{})
216// defer close(ch)
217// requestNum := 0
218// for _, stub := range stubs {
219// go sendCreateDeviceRequest(ctx, stub, device, ch)
220// requestNum += 1
221// }
222// fmt.Println("Waiting for create device response ...")
223// receivedResponse := 0
224// var err error
225// var returnedDevice *voltha.Device
226// select {
227// case res, ok := <-ch:
228// receivedResponse += 1
229// if !ok {
230// } else if er, ok := res.(error); ok {
231// err = er
232// } else if d, ok := res.(*voltha.Device); ok {
233// returnedDevice = d
234// }
235// if receivedResponse == requestNum {
236// break
237// }
238// }
239// if returnedDevice != nil {
240// return returnedDevice, nil
241// }
242// return nil, err
243//}
244
245// createDevice sends two requests to each core and waits for both responses
246func createDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
247 ui := uuid.New()
248 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
249 randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400250 device := &voltha.Device{Type: "simulated_olt", MacAddress: randomMacAddress}
khenaidoo297cd252019-02-07 22:10:23 -0500251 ch := make(chan interface{})
252 defer close(ch)
253 requestNum := 0
254 for _, stub := range stubs {
255 go sendCreateDeviceRequest(ctx, stub, device, ch)
256 requestNum += 1
257 }
258 fmt.Println("Waiting for create device response ...")
259 receivedResponse := 0
260 var err error
261 var returnedDevice *voltha.Device
262 select {
263 case res, ok := <-ch:
264 receivedResponse += 1
265 if !ok {
266 } else if er, ok := res.(error); ok {
267 err = er
268 } else if d, ok := res.(*voltha.Device); ok {
269 returnedDevice = d
270 }
271 if receivedResponse == requestNum {
272 break
273 }
274 }
275 if returnedDevice != nil {
276 return returnedDevice, nil
277 }
278 return nil, err
279}
280
281// enableDevices sends two requests to each core for each device and waits for both responses before sending another
282// enable request for a different device.
283func enableAllDevices(stubs []voltha.VolthaServiceClient) error {
284 for deviceId, val := range devices {
285 if val.AdminState == voltha.AdminState_PREPROVISIONED {
286 ui := uuid.New()
287 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
288 ch := make(chan interface{})
289 defer close(ch)
290 requestNum := 0
291 for _, stub := range stubs {
292 go sendEnableDeviceRequest(ctx, stub, deviceId, ch)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400293 requestNum += 1
khenaidoo297cd252019-02-07 22:10:23 -0500294 }
295 receivedResponse := 0
296 var err error
297 fmt.Println("Waiting for enable device response ...")
298 validResponseReceived := false
299 select {
300 case res, ok := <-ch:
301 receivedResponse += 1
302 if !ok {
303 } else if er, ok := res.(error); ok {
304 err = er
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400305 } else if _, ok := res.(*empty.Empty); ok {
khenaidoo297cd252019-02-07 22:10:23 -0500306 validResponseReceived = true
307 }
308 if receivedResponse == requestNum {
309 break
310 }
311 }
312 if validResponseReceived {
313 return nil
314 }
315 return err
316 }
317 }
318 return nil
319}
320
khenaidoo297cd252019-02-07 22:10:23 -0500321func TestConcurrentRequests(t *testing.T) {
322 fmt.Println("Testing Concurrent requests ...")
323
324 ////0. Start kafka and Ectd
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400325 startKafka()
326 defer stopKafka()
327 startEtcd()
328 defer stopKafka()
khenaidoo297cd252019-02-07 22:10:23 -0500329 //
330 ////1. Start the core
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400331 startCores()
332 defer stopCores()
khenaidoo297cd252019-02-07 22:10:23 -0500333 //
334 ////2. Start the simulated adapters
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400335 startSimulatedOLTAndONUAdapters()
336 defer stopSimulatedOLTAndONUAdapters()
khenaidoo297cd252019-02-07 22:10:23 -0500337 //
338 //// Wait until the core and adapters sync up
339 //time.Sleep(10 * time.Second)
340
341 stubs = setupGrpcConnection()
342
343 //3. Create the devices
344 response, err := createDevice(stubs)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000345 logger.Infow(ctx, "response", log.Fields{"res": response, "error": err})
khenaidoo297cd252019-02-07 22:10:23 -0500346 assert.Nil(t, err)
347 devices[response.Id] = response
348
349 //4. Enable all the devices
350 err = enableAllDevices(stubs)
351 assert.Nil(t, err)
352
353 ////5. Store simulated adapters
354 //stopSimulatedOLTAndONUAdapters()
355 //
356 ////6. Store the core
357 //stopCores()
358 //
359 ////7. Stop Kafka and Etcd
360 //stopKafka()
361 //stopEtcd()
362}
363
khenaidoo297cd252019-02-07 22:10:23 -0500364func shutdown() {
365 for _, conn := range conns {
366 conn.Close()
367 }
368}
369
370func TestMain(m *testing.M) {
371 setup()
372 code := m.Run()
373 shutdown()
374 os.Exit(code)
375}