blob: 5fbda49873641595505b49cc75c41b455afcd100 [file] [log] [blame]
Kent Hagerman0ab4cb22019-04-24 13:13:35 -04001// +build integration
2
khenaidoo297cd252019-02-07 22:10:23 -05003/*
4 * Copyright 2018-present Open Networking Foundation
5
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9
10 * http://www.apache.org/licenses/LICENSE-2.0
11
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18package concurrency
19
20import (
21 "context"
22 "errors"
23 "fmt"
24 "github.com/golang/protobuf/ptypes/empty"
25 "github.com/google/uuid"
26 com "github.com/opencord/voltha-go/adapters/common"
27 "github.com/opencord/voltha-go/common/log"
William Kurkiandaa6bb22019-03-07 12:26:28 -050028 "github.com/opencord/voltha-protos/go/common"
29 "github.com/opencord/voltha-protos/go/voltha"
khenaidoo297cd252019-02-07 22:10:23 -050030 "github.com/stretchr/testify/assert"
31 "google.golang.org/grpc"
32 "google.golang.org/grpc/metadata"
33 "os"
34 "os/exec"
35 "strings"
36 "testing"
37)
38
39var conns []*grpc.ClientConn
40var stubs []voltha.VolthaServiceClient
41var volthaSerialNumberKey string
42var grpcPorts []int
43
44/*
45 This series of tests are executed with two RW_Cores
46*/
47
48var devices map[string]*voltha.Device
49
50func setup() {
51 var err error
52
53 if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
54 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
55 }
56 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
57 log.SetAllLogLevel(log.ErrorLevel)
58
59 grpcPorts = []int{50057, 50058}
60 stubs = make([]voltha.VolthaServiceClient, 0)
61 conns = make([]*grpc.ClientConn, 0)
62
63 volthaSerialNumberKey = "voltha_serial_number"
64 devices = make(map[string]*voltha.Device)
65}
66
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040067func connectToCore(port int) (voltha.VolthaServiceClient, error) {
khenaidoo297cd252019-02-07 22:10:23 -050068 grpcHostIP := os.Getenv("DOCKER_HOST_IP")
69 grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, port)
70 conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
71 if err != nil {
72 log.Fatalf("did not connect: %s", err)
73 return nil, errors.New("failure-to-connect")
74 }
75 conns = append(conns, conn)
76 return voltha.NewVolthaServiceClient(conn), nil
77}
78
79func setupGrpcConnection() []voltha.VolthaServiceClient {
80 // We have 2 concurrent cores. Connect to them
81 for _, port := range grpcPorts {
82 if client, err := connectToCore(port); err == nil {
83 stubs = append(stubs, client)
84 log.Infow("connected", log.Fields{"port": port})
85 }
86 }
87 return stubs
88}
89
90func clearAllDevices(clearMap bool) {
91 for key, _ := range devices {
92 ctx := context.Background()
93 response, err := stubs[1].DeleteDevice(ctx, &voltha.ID{Id: key})
94 log.Infow("response", log.Fields{"res": response, "error": err})
95 if clearMap {
96 delete(devices, key)
97 }
98 }
99}
100
101// Verify if all ids are present in the global list of devices
102func hasAllIds(ids *voltha.IDs) bool {
103 if ids == nil && len(devices) == 0 {
104 return true
105 }
106 if ids == nil {
107 return false
108 }
109 for _, id := range ids.Items {
110 if _, exist := devices[id.Id]; !exist {
111 return false
112 }
113 }
114 return true
115}
116
117func startKafka() {
118 fmt.Println("Starting Kafka and Etcd ...")
119 command := "docker-compose"
120 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
121 if err := cmd.Run(); err != nil {
122 log.Fatal(err)
123 }
124}
125
126func startEtcd() {
127 fmt.Println("Starting Etcd ...")
128 command := "docker-compose"
129 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "up", "-d")
130 if err := cmd.Run(); err != nil {
131 log.Fatal(err)
132 }
133}
134
135func stopKafka() {
136 fmt.Println("Stopping Kafka and Etcd ...")
137 command := "docker-compose"
138 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "down")
139 if err := cmd.Run(); err != nil {
140 // ignore error - as this is mostly due network being left behind as its being used by other
141 // containers
142 log.Warn(err)
143 }
144}
145
146func stopEtcd() {
147 fmt.Println("Stopping Etcd ...")
148 command := "docker-compose"
149 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "down")
150 if err := cmd.Run(); err != nil {
151 // ignore error - as this is mostly due network being left behind as its being used by other
152 // containers
153 log.Warn(err)
154 }
155}
156
157func startCores() {
158 fmt.Println("Starting voltha cores ...")
159 command := "docker-compose"
160 cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "up", "-d")
161 if err := cmd.Run(); err != nil {
162 log.Fatal(err)
163 }
164}
165
166func stopCores() {
167 fmt.Println("Stopping voltha cores ...")
168 command := "docker-compose"
169 cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "down")
170 if err := cmd.Run(); err != nil {
171 // ignore error - as this is mostly due network being left behind as its being used by other
172 // containers
173 log.Warn(err)
174 }
175}
176
177func startSimulatedOLTAndONUAdapters() {
178 fmt.Println("Starting simulated OLT and ONU adapters ...")
179 command := "docker-compose"
180 cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "up", "-d")
181 if err := cmd.Run(); err != nil {
182 log.Fatal(err)
183 }
184}
185
186func stopSimulatedOLTAndONUAdapters() {
187 fmt.Println("Stopping simulated OLT and ONU adapters ...")
188 command := "docker-compose"
189 cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "down")
190 if err := cmd.Run(); err != nil {
191 // ignore error - as this is mostly due network being left behind as its being used by other
192 // containers
193 log.Warn(err)
194 }
195}
196
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400197func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{}) {
khenaidoo297cd252019-02-07 22:10:23 -0500198 fmt.Println("Sending create device ...")
199 if response, err := stub.CreateDevice(ctx, device); err != nil {
200 ch <- err
201 } else {
202 ch <- response
203 }
204}
205
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400206func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{}) {
khenaidoo297cd252019-02-07 22:10:23 -0500207 fmt.Println("Sending enable device ...")
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400208 if response, err := stub.EnableDevice(ctx, &common.ID{Id: deviceId}); err != nil {
khenaidoo297cd252019-02-07 22:10:23 -0500209 ch <- err
210 } else {
211 ch <- response
212 }
213}
214
215//// createPonsimDevice sends two requests to each core and waits for both responses
216//func createPonsimDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
217// ui := uuid.New()
218// ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
219// //preprovision_olt -t ponsim_olt -H 172.20.0.11:50060
220// device := &voltha.Device{Type: "ponsim_olt"}
221// device.Address = &voltha.Device_HostAndPort{HostAndPort:"172.20.0.11:50060"}
222// ch := make(chan interface{})
223// defer close(ch)
224// requestNum := 0
225// for _, stub := range stubs {
226// go sendCreateDeviceRequest(ctx, stub, device, ch)
227// requestNum += 1
228// }
229// fmt.Println("Waiting for create device response ...")
230// receivedResponse := 0
231// var err error
232// var returnedDevice *voltha.Device
233// select {
234// case res, ok := <-ch:
235// receivedResponse += 1
236// if !ok {
237// } else if er, ok := res.(error); ok {
238// err = er
239// } else if d, ok := res.(*voltha.Device); ok {
240// returnedDevice = d
241// }
242// if receivedResponse == requestNum {
243// break
244// }
245// }
246// if returnedDevice != nil {
247// return returnedDevice, nil
248// }
249// return nil, err
250//}
251
252// createDevice sends two requests to each core and waits for both responses
253func createDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
254 ui := uuid.New()
255 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
256 randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400257 device := &voltha.Device{Type: "simulated_olt", MacAddress: randomMacAddress}
khenaidoo297cd252019-02-07 22:10:23 -0500258 ch := make(chan interface{})
259 defer close(ch)
260 requestNum := 0
261 for _, stub := range stubs {
262 go sendCreateDeviceRequest(ctx, stub, device, ch)
263 requestNum += 1
264 }
265 fmt.Println("Waiting for create device response ...")
266 receivedResponse := 0
267 var err error
268 var returnedDevice *voltha.Device
269 select {
270 case res, ok := <-ch:
271 receivedResponse += 1
272 if !ok {
273 } else if er, ok := res.(error); ok {
274 err = er
275 } else if d, ok := res.(*voltha.Device); ok {
276 returnedDevice = d
277 }
278 if receivedResponse == requestNum {
279 break
280 }
281 }
282 if returnedDevice != nil {
283 return returnedDevice, nil
284 }
285 return nil, err
286}
287
288// enableDevices sends two requests to each core for each device and waits for both responses before sending another
289// enable request for a different device.
290func enableAllDevices(stubs []voltha.VolthaServiceClient) error {
291 for deviceId, val := range devices {
292 if val.AdminState == voltha.AdminState_PREPROVISIONED {
293 ui := uuid.New()
294 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
295 ch := make(chan interface{})
296 defer close(ch)
297 requestNum := 0
298 for _, stub := range stubs {
299 go sendEnableDeviceRequest(ctx, stub, deviceId, ch)
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400300 requestNum += 1
khenaidoo297cd252019-02-07 22:10:23 -0500301 }
302 receivedResponse := 0
303 var err error
304 fmt.Println("Waiting for enable device response ...")
305 validResponseReceived := false
306 select {
307 case res, ok := <-ch:
308 receivedResponse += 1
309 if !ok {
310 } else if er, ok := res.(error); ok {
311 err = er
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400312 } else if _, ok := res.(*empty.Empty); ok {
khenaidoo297cd252019-02-07 22:10:23 -0500313 validResponseReceived = true
314 }
315 if receivedResponse == requestNum {
316 break
317 }
318 }
319 if validResponseReceived {
320 return nil
321 }
322 return err
323 }
324 }
325 return nil
326}
327
khenaidoo297cd252019-02-07 22:10:23 -0500328func TestConcurrentRequests(t *testing.T) {
329 fmt.Println("Testing Concurrent requests ...")
330
331 ////0. Start kafka and Ectd
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400332 startKafka()
333 defer stopKafka()
334 startEtcd()
335 defer stopKafka()
khenaidoo297cd252019-02-07 22:10:23 -0500336 //
337 ////1. Start the core
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400338 startCores()
339 defer stopCores()
khenaidoo297cd252019-02-07 22:10:23 -0500340 //
341 ////2. Start the simulated adapters
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400342 startSimulatedOLTAndONUAdapters()
343 defer stopSimulatedOLTAndONUAdapters()
khenaidoo297cd252019-02-07 22:10:23 -0500344 //
345 //// Wait until the core and adapters sync up
346 //time.Sleep(10 * time.Second)
347
348 stubs = setupGrpcConnection()
349
350 //3. Create the devices
351 response, err := createDevice(stubs)
352 log.Infow("response", log.Fields{"res": response, "error": err})
353 assert.Nil(t, err)
354 devices[response.Id] = response
355
356 //4. Enable all the devices
357 err = enableAllDevices(stubs)
358 assert.Nil(t, err)
359
360 ////5. Store simulated adapters
361 //stopSimulatedOLTAndONUAdapters()
362 //
363 ////6. Store the core
364 //stopCores()
365 //
366 ////7. Stop Kafka and Etcd
367 //stopKafka()
368 //stopEtcd()
369}
370
khenaidoo297cd252019-02-07 22:10:23 -0500371func shutdown() {
372 for _, conn := range conns {
373 conn.Close()
374 }
375}
376
377func TestMain(m *testing.M) {
378 setup()
379 code := m.Run()
380 shutdown()
381 os.Exit(code)
382}