blob: 42f5de0a25e2aec31fc1735a8f2ecf349ca44e08 [file] [log] [blame]
khenaidoo297cd252019-02-07 22:10:23 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package concurrency
17
18import (
19 "context"
20 "errors"
21 "fmt"
22 "github.com/golang/protobuf/ptypes/empty"
23 "github.com/google/uuid"
24 com "github.com/opencord/voltha-go/adapters/common"
25 "github.com/opencord/voltha-go/common/log"
William Kurkiandaa6bb22019-03-07 12:26:28 -050026 "github.com/opencord/voltha-protos/go/common"
27 "github.com/opencord/voltha-protos/go/voltha"
khenaidoo297cd252019-02-07 22:10:23 -050028 "github.com/stretchr/testify/assert"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/metadata"
31 "os"
32 "os/exec"
33 "strings"
34 "testing"
35)
36
37var conns []*grpc.ClientConn
38var stubs []voltha.VolthaServiceClient
39var volthaSerialNumberKey string
40var grpcPorts []int
41
42/*
43 This series of tests are executed with two RW_Cores
44*/
45
46var devices map[string]*voltha.Device
47
48func setup() {
49 var err error
50
51 if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
52 log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
53 }
54 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
55 log.SetAllLogLevel(log.ErrorLevel)
56
57 grpcPorts = []int{50057, 50058}
58 stubs = make([]voltha.VolthaServiceClient, 0)
59 conns = make([]*grpc.ClientConn, 0)
60
61 volthaSerialNumberKey = "voltha_serial_number"
62 devices = make(map[string]*voltha.Device)
63}
64
65func connectToCore(port int) (voltha.VolthaServiceClient, error) {
66 grpcHostIP := os.Getenv("DOCKER_HOST_IP")
67 grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, port)
68 conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
69 if err != nil {
70 log.Fatalf("did not connect: %s", err)
71 return nil, errors.New("failure-to-connect")
72 }
73 conns = append(conns, conn)
74 return voltha.NewVolthaServiceClient(conn), nil
75}
76
77func setupGrpcConnection() []voltha.VolthaServiceClient {
78 // We have 2 concurrent cores. Connect to them
79 for _, port := range grpcPorts {
80 if client, err := connectToCore(port); err == nil {
81 stubs = append(stubs, client)
82 log.Infow("connected", log.Fields{"port": port})
83 }
84 }
85 return stubs
86}
87
88func clearAllDevices(clearMap bool) {
89 for key, _ := range devices {
90 ctx := context.Background()
91 response, err := stubs[1].DeleteDevice(ctx, &voltha.ID{Id: key})
92 log.Infow("response", log.Fields{"res": response, "error": err})
93 if clearMap {
94 delete(devices, key)
95 }
96 }
97}
98
99// Verify if all ids are present in the global list of devices
100func hasAllIds(ids *voltha.IDs) bool {
101 if ids == nil && len(devices) == 0 {
102 return true
103 }
104 if ids == nil {
105 return false
106 }
107 for _, id := range ids.Items {
108 if _, exist := devices[id.Id]; !exist {
109 return false
110 }
111 }
112 return true
113}
114
115func startKafka() {
116 fmt.Println("Starting Kafka and Etcd ...")
117 command := "docker-compose"
118 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
119 if err := cmd.Run(); err != nil {
120 log.Fatal(err)
121 }
122}
123
124func startEtcd() {
125 fmt.Println("Starting Etcd ...")
126 command := "docker-compose"
127 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "up", "-d")
128 if err := cmd.Run(); err != nil {
129 log.Fatal(err)
130 }
131}
132
133func stopKafka() {
134 fmt.Println("Stopping Kafka and Etcd ...")
135 command := "docker-compose"
136 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "down")
137 if err := cmd.Run(); err != nil {
138 // ignore error - as this is mostly due network being left behind as its being used by other
139 // containers
140 log.Warn(err)
141 }
142}
143
144func stopEtcd() {
145 fmt.Println("Stopping Etcd ...")
146 command := "docker-compose"
147 cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "down")
148 if err := cmd.Run(); err != nil {
149 // ignore error - as this is mostly due network being left behind as its being used by other
150 // containers
151 log.Warn(err)
152 }
153}
154
155func startCores() {
156 fmt.Println("Starting voltha cores ...")
157 command := "docker-compose"
158 cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "up", "-d")
159 if err := cmd.Run(); err != nil {
160 log.Fatal(err)
161 }
162}
163
164func stopCores() {
165 fmt.Println("Stopping voltha cores ...")
166 command := "docker-compose"
167 cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "down")
168 if err := cmd.Run(); err != nil {
169 // ignore error - as this is mostly due network being left behind as its being used by other
170 // containers
171 log.Warn(err)
172 }
173}
174
175func startSimulatedOLTAndONUAdapters() {
176 fmt.Println("Starting simulated OLT and ONU adapters ...")
177 command := "docker-compose"
178 cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "up", "-d")
179 if err := cmd.Run(); err != nil {
180 log.Fatal(err)
181 }
182}
183
184func stopSimulatedOLTAndONUAdapters() {
185 fmt.Println("Stopping simulated OLT and ONU adapters ...")
186 command := "docker-compose"
187 cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "down")
188 if err := cmd.Run(); err != nil {
189 // ignore error - as this is mostly due network being left behind as its being used by other
190 // containers
191 log.Warn(err)
192 }
193}
194
195
196func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{} ) {
197 fmt.Println("Sending create device ...")
198 if response, err := stub.CreateDevice(ctx, device); err != nil {
199 ch <- err
200 } else {
201 ch <- response
202 }
203}
204
205func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{} ) {
206 fmt.Println("Sending enable device ...")
207 if response, err := stub.EnableDevice(ctx, &common.ID{Id:deviceId}); err != nil {
208 ch <- err
209 } else {
210 ch <- response
211 }
212}
213
214//// createPonsimDevice sends two requests to each core and waits for both responses
215//func createPonsimDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
216// ui := uuid.New()
217// ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
218// //preprovision_olt -t ponsim_olt -H 172.20.0.11:50060
219// device := &voltha.Device{Type: "ponsim_olt"}
220// device.Address = &voltha.Device_HostAndPort{HostAndPort:"172.20.0.11:50060"}
221// ch := make(chan interface{})
222// defer close(ch)
223// requestNum := 0
224// for _, stub := range stubs {
225// go sendCreateDeviceRequest(ctx, stub, device, ch)
226// requestNum += 1
227// }
228// fmt.Println("Waiting for create device response ...")
229// receivedResponse := 0
230// var err error
231// var returnedDevice *voltha.Device
232// select {
233// case res, ok := <-ch:
234// receivedResponse += 1
235// if !ok {
236// } else if er, ok := res.(error); ok {
237// err = er
238// } else if d, ok := res.(*voltha.Device); ok {
239// returnedDevice = d
240// }
241// if receivedResponse == requestNum {
242// break
243// }
244// }
245// if returnedDevice != nil {
246// return returnedDevice, nil
247// }
248// return nil, err
249//}
250
251// createDevice sends two requests to each core and waits for both responses
252func createDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
253 ui := uuid.New()
254 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
255 randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
256 device := &voltha.Device{Type: "simulated_olt", MacAddress:randomMacAddress}
257 ch := make(chan interface{})
258 defer close(ch)
259 requestNum := 0
260 for _, stub := range stubs {
261 go sendCreateDeviceRequest(ctx, stub, device, ch)
262 requestNum += 1
263 }
264 fmt.Println("Waiting for create device response ...")
265 receivedResponse := 0
266 var err error
267 var returnedDevice *voltha.Device
268 select {
269 case res, ok := <-ch:
270 receivedResponse += 1
271 if !ok {
272 } else if er, ok := res.(error); ok {
273 err = er
274 } else if d, ok := res.(*voltha.Device); ok {
275 returnedDevice = d
276 }
277 if receivedResponse == requestNum {
278 break
279 }
280 }
281 if returnedDevice != nil {
282 return returnedDevice, nil
283 }
284 return nil, err
285}
286
287// enableDevices sends two requests to each core for each device and waits for both responses before sending another
288// enable request for a different device.
289func enableAllDevices(stubs []voltha.VolthaServiceClient) error {
290 for deviceId, val := range devices {
291 if val.AdminState == voltha.AdminState_PREPROVISIONED {
292 ui := uuid.New()
293 ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
294 ch := make(chan interface{})
295 defer close(ch)
296 requestNum := 0
297 for _, stub := range stubs {
298 go sendEnableDeviceRequest(ctx, stub, deviceId, ch)
299 requestNum +=1
300 }
301 receivedResponse := 0
302 var err error
303 fmt.Println("Waiting for enable device response ...")
304 validResponseReceived := false
305 select {
306 case res, ok := <-ch:
307 receivedResponse += 1
308 if !ok {
309 } else if er, ok := res.(error); ok {
310 err = er
311 } else if _ , ok := res.(*empty.Empty); ok {
312 validResponseReceived = true
313 }
314 if receivedResponse == requestNum {
315 break
316 }
317 }
318 if validResponseReceived {
319 return nil
320 }
321 return err
322 }
323 }
324 return nil
325}
326
327
328func TestConcurrentRequests(t *testing.T) {
329 fmt.Println("Testing Concurrent requests ...")
330
331 ////0. Start kafka and Ectd
332 //startKafka()
333 //startEtcd()
334 //
335 ////1. Start the core
336 //startCores()
337 //
338 ////2. Start the simulated adapters
339 //startSimulatedOLTAndONUAdapters()
340 //
341 //// Wait until the core and adapters sync up
342 //time.Sleep(10 * time.Second)
343
344 stubs = setupGrpcConnection()
345
346 //3. Create the devices
347 response, err := createDevice(stubs)
348 log.Infow("response", log.Fields{"res": response, "error": err})
349 assert.Nil(t, err)
350 devices[response.Id] = response
351
352 //4. Enable all the devices
353 err = enableAllDevices(stubs)
354 assert.Nil(t, err)
355
356 ////5. Store simulated adapters
357 //stopSimulatedOLTAndONUAdapters()
358 //
359 ////6. Store the core
360 //stopCores()
361 //
362 ////7. Stop Kafka and Etcd
363 //stopKafka()
364 //stopEtcd()
365}
366
367
368func shutdown() {
369 for _, conn := range conns {
370 conn.Close()
371 }
372}
373
374func TestMain(m *testing.M) {
375 setup()
376 code := m.Run()
377 shutdown()
378 os.Exit(code)
379}