blob: 6e1b5315168dd26abb3bdaaa7f897ef1b2e6d085 [file] [log] [blame]
khenaidoo7ccedd52018-12-14 16:48:54 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "fmt"
21 "github.com/golang/protobuf/ptypes/empty"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/protos/voltha"
24 "github.com/stretchr/testify/assert"
25 "google.golang.org/grpc"
26 "os"
27 "os/exec"
28 "testing"
29 "time"
30)
31
32var conn *grpc.ClientConn
33var stub voltha.VolthaServiceClient
34var devices map[string]*voltha.Device
35
36func init() {
37 log.AddPackage(log.JSON, log.ErrorLevel, nil)
38 log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
39 log.SetAllLogLevel(log.ErrorLevel)
40
41 //Start kafka and Etcd
42 startKafkaEtcd()
43 time.Sleep(10 * time.Second) //TODO: Find a better way to ascertain they are up
44
45 stub = setupGrpcConnection()
46 stub = voltha.NewVolthaServiceClient(conn)
47 devices = make(map[string]*voltha.Device)
48}
49
50func setupGrpcConnection() voltha.VolthaServiceClient {
51 grpcHostIP := os.Getenv("DOCKER_HOST_IP")
52 grpcPort := 50057
53 grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, grpcPort)
54 var err error
55 conn, err = grpc.Dial(grpcHost, grpc.WithInsecure())
56 if err != nil {
57 log.Fatalf("did not connect: %s", err)
58 }
59 return voltha.NewVolthaServiceClient(conn)
60}
61
62func clearAllDevices(clearMap bool) {
63 for key, _ := range devices {
64 ctx := context.Background()
65 response, err := stub.DeleteDevice(ctx, &voltha.ID{Id: key})
66 log.Infow("response", log.Fields{"res": response, "error": err})
67 if clearMap {
68 delete(devices, key)
69 }
70 }
71}
72
73// Verify if all ids are present in the global list of devices
74func hasAllIds(ids *voltha.IDs) bool {
75 if ids == nil && len(devices) == 0 {
76 return true
77 }
78 if ids == nil {
79 return false
80 }
81 for _, id := range ids.Items {
82 if _, exist := devices[id.Id]; !exist {
83 return false
84 }
85 }
86 return true
87}
88
89func startKafkaEtcd() {
90 fmt.Println("Starting Kafka and Etcd ...")
91 command := "docker-compose"
92 cmd := exec.Command(command, "-f", "../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
93 if err := cmd.Run(); err != nil {
94 log.Fatal(err)
95 }
96 cmd = exec.Command(command, "-f", "../../compose/docker-compose-etcd.yml", "up", "-d")
97 if err := cmd.Run(); err != nil {
98 log.Fatal(err)
99 }
100}
101
102func stopKafkaEtcd() {
103 fmt.Println("Stopping Kafka and Etcd ...")
104 command := "docker-compose"
105 cmd := exec.Command(command, "-f", "../../compose/docker-compose-zk-kafka-test.yml", "down")
106 if err := cmd.Run(); err != nil {
107 // ignore error - as this is mostly due network being left behind as its being used by other
108 // containers
109 log.Warn(err)
110 }
111 cmd = exec.Command(command, "-f", "../../compose/docker-compose-etcd.yml", "down")
112 if err := cmd.Run(); err != nil {
113 // ignore error - as this is mostly due network being left behind as its being used by other
114 // containers
115 log.Warn(err)
116 }
117}
118
119func startCore() {
120 fmt.Println("Starting voltha core ...")
121 command := "docker-compose"
122 cmd := exec.Command(command, "-f", "../../compose/rw_core.yml", "up", "-d")
123 if err := cmd.Run(); err != nil {
124 log.Fatal(err)
125 }
126}
127
128func stopCore() {
129 fmt.Println("Stopping voltha core ...")
130 command := "docker-compose"
131 cmd := exec.Command(command, "-f", "../../compose/rw_core.yml", "down")
132 if err := cmd.Run(); err != nil {
133 // ignore error - as this is mostly due network being left behind as its being used by other
134 // containers
135 log.Warn(err)
136 }
137}
138
139func TestListDeviceIds(t *testing.T) {
140 //1. Start the core
141 startCore()
142
143 // Wait until it's up - TODO: find a better way to check
144 time.Sleep(10 * time.Second)
145
146 //2. Create a set of devices into the Core
147 for i := 0; i < 10; i++ {
148 ctx := context.Background()
149 device := &voltha.Device{Type: "simulated_olt"}
150 response, err := stub.CreateDevice(ctx, device)
151 log.Infow("response", log.Fields{"res": response, "error": err})
152 assert.Nil(t, err)
153 devices[response.Id] = response
154 }
155
156 //3. Verify devices have been added correctly
157 ctx := context.Background()
158 response, err := stub.ListDeviceIds(ctx, &empty.Empty{})
159 log.Infow("response", log.Fields{"res": response, "error": err})
160 assert.Nil(t, err)
161 assert.True(t, hasAllIds(response))
162
163 // 4. Stop the core
164 stopCore()
165}
166
167func TestReconcileDevices(t *testing.T) {
168 //1. Start the core
169 startCore()
170
171 // Wait until it's up - TODO: find a better way to check
172 time.Sleep(10 * time.Second)
173
174 //2. Create a set of devices into the Core
175 for i := 0; i < 10; i++ {
176 ctx := context.Background()
177 device := &voltha.Device{Type: "simulated_olt"}
178 response, err := stub.CreateDevice(ctx, device)
179 log.Infow("response", log.Fields{"res": response, "error": err})
180 assert.Nil(t, err)
181 devices[response.Id] = response
182 }
183 //3. Verify devices have been added correctly
184 ctx := context.Background()
185 response, err := stub.ListDeviceIds(ctx, &empty.Empty{})
186 log.Infow("response", log.Fields{"res": response, "error": err})
187 assert.Nil(t, err)
188 assert.True(t, hasAllIds(response))
189
190 //4. Stop the core and restart it. This will start the core with no data in memory but
191 // etcd will still have the data.
192 stopCore()
193 time.Sleep(5 * time.Second)
194 startCore()
195 time.Sleep(10 * time.Second)
196
197 //5. Setup the connection again
198 stub = setupGrpcConnection()
199
200 //6. Verify there are no devices left
201 ctx = context.Background()
202 response, err = stub.ListDeviceIds(ctx, &empty.Empty{})
203 log.Infow("response", log.Fields{"res": response, "error": err})
204 assert.Nil(t, err)
205 assert.Equal(t, len(response.Items), 0)
206
207 //7. Invoke reconcile with all stored list
208 toRestore := &voltha.IDs{Items: make([]*voltha.ID, 0)}
209 for key, _ := range devices {
210 toRestore.Items = append(toRestore.Items, &voltha.ID{Id: key})
211 }
212 ctx = context.Background()
213 _, err = stub.ReconcileDevices(ctx, toRestore)
214 assert.Nil(t, err)
215
216 //8. Verify all devices have been restored
217 ctx = context.Background()
218 response, err = stub.ListDeviceIds(ctx, &empty.Empty{})
219 log.Infow("response", log.Fields{"res": response, "error": err})
220 assert.Nil(t, err)
221 assert.True(t, hasAllIds(response))
222
223 for _, id := range response.Items {
224 fmt.Println("id", id.Id)
225 }
226
227 //9. Store the core
228 stopCore()
229}
230
231func shutdown() {
232 conn.Close()
233 stopKafkaEtcd()
234}
235
236func TestMain(m *testing.M) {
237 code := m.Run()
238 shutdown()
239 os.Exit(code)
240}