blob: c8ddff2fb4b300d541acf89b10fef01b2ee45b75 [file] [log] [blame]
Kent Hagerman0ab4cb22019-04-24 13:13:35 -04001// +build integration
2
khenaidoo297cd252019-02-07 22:10:23 -05003/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package concurrency
19
20import (
21 "context"
22 "errors"
23 "fmt"
npujar12732342019-11-14 17:28:40 +053024 "os"
25 "os/exec"
26 "strings"
27 "testing"
28
khenaidoo297cd252019-02-07 22:10:23 -050029 "github.com/golang/protobuf/ptypes/empty"
30 "github.com/google/uuid"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080031 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
32 "github.com/opencord/voltha-lib-go/v3/pkg/log"
33 "github.com/opencord/voltha-protos/v3/go/common"
34 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoo297cd252019-02-07 22:10:23 -050035 "github.com/stretchr/testify/assert"
36 "google.golang.org/grpc"
37 "google.golang.org/grpc/metadata"
khenaidoo297cd252019-02-07 22:10:23 -050038)
39
40var conns []*grpc.ClientConn
41var stubs []voltha.VolthaServiceClient
42var volthaSerialNumberKey string
43var grpcPorts []int
44
45/*
46 This series of tests are executed with two RW_Cores
47*/
48
49var devices map[string]*voltha.Device
50
51func setup() {
52 var err error
53
54 if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
55 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
56 }
57 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
58 log.SetAllLogLevel(log.ErrorLevel)
59
60 grpcPorts = []int{50057, 50058}
61 stubs = make([]voltha.VolthaServiceClient, 0)
62 conns = make([]*grpc.ClientConn, 0)
63
64 volthaSerialNumberKey = "voltha_serial_number"
65 devices = make(map[string]*voltha.Device)
66}
67
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040068func connectToCore(port int) (voltha.VolthaServiceClient, error) {
khenaidoo297cd252019-02-07 22:10:23 -050069 grpcHostIP := os.Getenv("DOCKER_HOST_IP")
70 grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, port)
71 conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
72 if err != nil {
73 log.Fatalf("did not connect: %s", err)
74 return nil, errors.New("failure-to-connect")
75 }
76 conns = append(conns, conn)
77 return voltha.NewVolthaServiceClient(conn), nil
78}
79
80func setupGrpcConnection() []voltha.VolthaServiceClient {
81 // We have 2 concurrent cores. Connect to them
82 for _, port := range grpcPorts {
83 if client, err := connectToCore(port); err == nil {
84 stubs = append(stubs, client)
85 log.Infow("connected", log.Fields{"port": port})
86 }
87 }
88 return stubs
89}
90
91func clearAllDevices(clearMap bool) {
92 for key, _ := range devices {
93 ctx := context.Background()
94 response, err := stubs[1].DeleteDevice(ctx, &voltha.ID{Id: key})
95 log.Infow("response", log.Fields{"res": response, "error": err})
96 if clearMap {
97 delete(devices, key)
98 }
99 }
100}
101
102// Verify if all ids are present in the global list of devices
103func hasAllIds(ids *voltha.IDs) bool {
104 if ids == nil && len(devices) == 0 {
105 return true
106 }
107 if ids == nil {
108 return false
109 }
110 for _, id := range ids.Items {
111 if _, exist := devices[id.Id]; !exist {
112 return false
113 }
114 }
115 return true
116}
117
118func startKafka() {
119 fmt.Println("Starting Kafka and Etcd ...")
120 command := "docker-compose"
121 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
122 if err := cmd.Run(); err != nil {
123 log.Fatal(err)
124 }
125}
126
127func startEtcd() {
128 fmt.Println("Starting Etcd ...")
129 command := "docker-compose"
130 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "up", "-d")
131 if err := cmd.Run(); err != nil {
132 log.Fatal(err)
133 }
134}
135
136func stopKafka() {
137 fmt.Println("Stopping Kafka and Etcd ...")
138 command := "docker-compose"
139 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "down")
140 if err := cmd.Run(); err != nil {
141 // ignore error - as this is mostly due network being left behind as its being used by other
142 // containers
143 log.Warn(err)
144 }
145}
146
147func stopEtcd() {
148 fmt.Println("Stopping Etcd ...")
149 command := "docker-compose"
150 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "down")
151 if err := cmd.Run(); err != nil {
152 // ignore error - as this is mostly due network being left behind as its being used by other
153 // containers
154 log.Warn(err)
155 }
156}
157
158func startCores() {
159 fmt.Println("Starting voltha cores ...")
160 command := "docker-compose"
161 cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "up", "-d")
162 if err := cmd.Run(); err != nil {
163 log.Fatal(err)
164 }
165}
166
167func stopCores() {
168 fmt.Println("Stopping voltha cores ...")
169 command := "docker-compose"
170 cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "down")
171 if err := cmd.Run(); err != nil {
172 // ignore error - as this is mostly due network being left behind as its being used by other
173 // containers
174 log.Warn(err)
175 }
176}
177
178func startSimulatedOLTAndONUAdapters() {
179 fmt.Println("Starting simulated OLT and ONU adapters ...")
180 command := "docker-compose"
181 cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "up", "-d")
182 if err := cmd.Run(); err != nil {
183 log.Fatal(err)
184 }
185}
186
187func stopSimulatedOLTAndONUAdapters() {
188 fmt.Println("Stopping simulated OLT and ONU adapters ...")
189 command := "docker-compose"
190 cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "down")
191 if err := cmd.Run(); err != nil {
192 // ignore error - as this is mostly due network being left behind as its being used by other
193 // containers
194 log.Warn(err)
195 }
196}
197
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400198func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{}) {
khenaidoo297cd252019-02-07 22:10:23 -0500199 fmt.Println("Sending create device ...")
200 if response, err := stub.CreateDevice(ctx, device); err != nil {
201 ch <- err
202 } else {
203 ch <- response
204 }
205}
206
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400207func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{}) {
khenaidoo297cd252019-02-07 22:10:23 -0500208 fmt.Println("Sending enable device ...")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400209 if response, err := stub.EnableDevice(ctx, &common.ID{Id: deviceId}); err != nil {
khenaidoo297cd252019-02-07 22:10:23 -0500210 ch <- err
211 } else {
212 ch <- response
213 }
214}
215
216//// createPonsimDevice sends two requests to each core and waits for both responses
217//func createPonsimDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
218// ui := uuid.New()
219// ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
220// //preprovision_olt -t ponsim_olt -H 172.20.0.11:50060
221// device := &voltha.Device{Type: "ponsim_olt"}
222// device.Address = &voltha.Device_HostAndPort{HostAndPort:"172.20.0.11:50060"}
223// ch := make(chan interface{})
224// defer close(ch)
225// requestNum := 0
226// for _, stub := range stubs {
227// go sendCreateDeviceRequest(ctx, stub, device, ch)
228// requestNum += 1
229// }
230// fmt.Println("Waiting for create device response ...")
231// receivedResponse := 0
232// var err error
233// var returnedDevice *voltha.Device
234// select {
235// case res, ok := <-ch:
236// receivedResponse += 1
237// if !ok {
238// } else if er, ok := res.(error); ok {
239// err = er
240// } else if d, ok := res.(*voltha.Device); ok {
241// returnedDevice = d
242// }
243// if receivedResponse == requestNum {
244// break
245// }
246// }
247// if returnedDevice != nil {
248// return returnedDevice, nil
249// }
250// return nil, err
251//}
252
253// createDevice sends two requests to each core and waits for both responses
254func createDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
255 ui := uuid.New()
256 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
257 randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400258 device := &voltha.Device{Type: "simulated_olt", MacAddress: randomMacAddress}
khenaidoo297cd252019-02-07 22:10:23 -0500259 ch := make(chan interface{})
260 defer close(ch)
261 requestNum := 0
262 for _, stub := range stubs {
263 go sendCreateDeviceRequest(ctx, stub, device, ch)
264 requestNum += 1
265 }
266 fmt.Println("Waiting for create device response ...")
267 receivedResponse := 0
268 var err error
269 var returnedDevice *voltha.Device
270 select {
271 case res, ok := <-ch:
272 receivedResponse += 1
273 if !ok {
274 } else if er, ok := res.(error); ok {
275 err = er
276 } else if d, ok := res.(*voltha.Device); ok {
277 returnedDevice = d
278 }
279 if receivedResponse == requestNum {
280 break
281 }
282 }
283 if returnedDevice != nil {
284 return returnedDevice, nil
285 }
286 return nil, err
287}
288
289// enableDevices sends two requests to each core for each device and waits for both responses before sending another
290// enable request for a different device.
291func enableAllDevices(stubs []voltha.VolthaServiceClient) error {
292 for deviceId, val := range devices {
293 if val.AdminState == voltha.AdminState_PREPROVISIONED {
294 ui := uuid.New()
295 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
296 ch := make(chan interface{})
297 defer close(ch)
298 requestNum := 0
299 for _, stub := range stubs {
300 go sendEnableDeviceRequest(ctx, stub, deviceId, ch)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400301 requestNum += 1
khenaidoo297cd252019-02-07 22:10:23 -0500302 }
303 receivedResponse := 0
304 var err error
305 fmt.Println("Waiting for enable device response ...")
306 validResponseReceived := false
307 select {
308 case res, ok := <-ch:
309 receivedResponse += 1
310 if !ok {
311 } else if er, ok := res.(error); ok {
312 err = er
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400313 } else if _, ok := res.(*empty.Empty); ok {
khenaidoo297cd252019-02-07 22:10:23 -0500314 validResponseReceived = true
315 }
316 if receivedResponse == requestNum {
317 break
318 }
319 }
320 if validResponseReceived {
321 return nil
322 }
323 return err
324 }
325 }
326 return nil
327}
328
khenaidoo297cd252019-02-07 22:10:23 -0500329func TestConcurrentRequests(t *testing.T) {
330 fmt.Println("Testing Concurrent requests ...")
331
332 ////0. Start kafka and Ectd
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400333 startKafka()
334 defer stopKafka()
335 startEtcd()
336 defer stopKafka()
khenaidoo297cd252019-02-07 22:10:23 -0500337 //
338 ////1. Start the core
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400339 startCores()
340 defer stopCores()
khenaidoo297cd252019-02-07 22:10:23 -0500341 //
342 ////2. Start the simulated adapters
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400343 startSimulatedOLTAndONUAdapters()
344 defer stopSimulatedOLTAndONUAdapters()
khenaidoo297cd252019-02-07 22:10:23 -0500345 //
346 //// Wait until the core and adapters sync up
347 //time.Sleep(10 * time.Second)
348
349 stubs = setupGrpcConnection()
350
351 //3. Create the devices
352 response, err := createDevice(stubs)
353 log.Infow("response", log.Fields{"res": response, "error": err})
354 assert.Nil(t, err)
355 devices[response.Id] = response
356
357 //4. Enable all the devices
358 err = enableAllDevices(stubs)
359 assert.Nil(t, err)
360
361 ////5. Store simulated adapters
362 //stopSimulatedOLTAndONUAdapters()
363 //
364 ////6. Store the core
365 //stopCores()
366 //
367 ////7. Stop Kafka and Etcd
368 //stopKafka()
369 //stopEtcd()
370}
371
khenaidoo297cd252019-02-07 22:10:23 -0500372func shutdown() {
373 for _, conn := range conns {
374 conn.Close()
375 }
376}
377
378func TestMain(m *testing.M) {
379 setup()
380 code := m.Run()
381 shutdown()
382 os.Exit(code)
383}