blob: 3825290eb017c1d59c7fcd880654c42537188e42 [file] [log] [blame]
khenaidoo6f415b22021-06-22 18:08:53 -04001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kvstore
17
18import (
19 "context"
Girish Gowdra4c60c672021-07-26 13:30:57 -070020 "github.com/opencord/voltha-lib-go/v6/pkg/log"
21 mocks "github.com/opencord/voltha-lib-go/v6/pkg/mocks/etcd"
khenaidoo6f415b22021-06-22 18:08:53 -040022 "github.com/phayes/freeport"
23 "github.com/stretchr/testify/assert"
24 "os"
25 "strconv"
26 "sync"
27 "testing"
28 "time"
29)
30
31const (
32 embedEtcdServerHost = "localhost"
33 defaultTimeout = 1 * time.Second
34)
35
36var (
37 embedEtcdServerPort int
38)
39
40func TestMain(m *testing.M) {
41 ctx := context.Background()
42 var err error
43 embedEtcdServerPort, err = freeport.GetFreePort()
44 if err != nil {
45 logger.Fatal(ctx, err)
46 }
47 peerPort, err := freeport.GetFreePort()
48 if err != nil {
49 logger.Fatal(ctx, err)
50 }
51 etcdServer := mocks.StartEtcdServer(ctx, mocks.MKConfig(ctx,
52 "voltha.db.kvstore.test",
53 embedEtcdServerPort,
54 peerPort,
55 "voltha.lib.db.kvstore",
56 "error"))
57 res := m.Run()
58
59 etcdServer.Stop(ctx)
60 os.Exit(res)
61}
62
63func TestNewRoundRobinEtcdClientAllocator(t *testing.T) {
64 address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
65 capacity := 20
66 maxUsage := 10
67
68 pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
69 assert.NotNil(t, pool)
70 assert.Nil(t, err)
71}
72
73func TestRoundRobin_Get_Put(t *testing.T) {
74 address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
75 capacity := 20
76 maxUsage := 10
77
78 pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
79 assert.NotNil(t, pool)
80 assert.Nil(t, err)
81
82 // Verify we can obtain the expected number of clients with no errors or waiting time
83 var wg sync.WaitGroup
84 for i := 0; i < capacity*maxUsage; i++ {
85 wg.Add(1)
86 go func() {
87 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
88 defer cancel()
89 c, err := pool.Get(ctx)
90 assert.NotNil(t, c)
91 assert.Nil(t, err)
92 time.Sleep(5 * time.Second)
93 pool.Put(c)
94 wg.Done()
95 }()
96 }
97 wg.Wait()
98}
99
100func TestRoundRobin_Get_Put_various_capacity(t *testing.T) {
101 // Test single client with 1 concurrent usage
102 getPutVaryingCapacity(t, 1, 1)
103
104 // Test single client with multiple concurrent usage
105 getPutVaryingCapacity(t, 1, 100)
106
107 // Test multiple clients with single concurrent usage
108 getPutVaryingCapacity(t, 10, 1)
109
110 // Test multiple clients with multiple concurrent usage
111 getPutVaryingCapacity(t, 10, 10)
112}
113
114func getPutVaryingCapacity(t *testing.T, capacity, maxUsage int) {
115 address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
116
117 pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
118 assert.NotNil(t, pool)
119 assert.Nil(t, err)
120
121 // Verify we can obtain the expected number of clients with no errors or waiting time
122 var wg sync.WaitGroup
123 totalSize := capacity * maxUsage
124 ch := make(chan struct{})
125 for i := 0; i < totalSize; i++ {
126 wg.Add(1)
127 go func() {
128 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
129 defer cancel()
130 c, err := pool.Get(ctx)
131 assert.NotNil(t, c)
132 assert.Nil(t, err)
133 // Inform the waiting loop that a client has been allocated
134 ch <- struct{}{}
135 // Keep the client for 5s and then return it to the pool
136 time.Sleep(5 * time.Second)
137 pool.Put(c)
138 wg.Done()
139 }()
140 }
141
142 // Wait until all clients are allocated
143 allocated := 0
144 for range ch {
145 allocated++
146 if allocated == totalSize {
147 break
148 }
149 }
150
151 // Try to get above capacity/usage with low timeout
152 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
153 c, err := pool.Get(ctx)
154 assert.NotNil(t, err)
155 assert.Nil(t, c)
156 cancel()
157
158 // Try to get above capacity/usage with longer timeout
159 ctx, cancel = context.WithTimeout(context.Background(), 6*time.Second)
160 c, err = pool.Get(ctx)
161 assert.NotNil(t, c)
162 assert.Nil(t, err)
163 pool.Put(c)
164 cancel()
165
166 wg.Wait()
167
168 // Close the connection
169 pool.Close(context.Background())
170}
171
172func TestRoundRobin_Close_various_capacity(t *testing.T) {
173 // Test single client with 1 concurrent usage
174 closeWithVaryingCapacity(t, 1, 1)
175
176 // Test single client with multiple concurrent usage
177 closeWithVaryingCapacity(t, 1, 100)
178
179 // Test multiple clients with single concurrent usage
180 closeWithVaryingCapacity(t, 10, 1)
181
182 // Test multiple clients with multiple concurrent usage
183 closeWithVaryingCapacity(t, 10, 10)
184}
185
186func closeWithVaryingCapacity(t *testing.T, capacity, maxUsage int) {
187 address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
188
189 pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
190 assert.NotNil(t, pool)
191 assert.Nil(t, err)
192
193 // Verify we can obtain the expected number of clients with no errors or waiting time
194 var wg sync.WaitGroup
195 totalSize := capacity * maxUsage
196 ch := make(chan struct{})
197 for i := 0; i < totalSize; i++ {
198 wg.Add(1)
199 go func() {
200 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
201 defer cancel()
202 c, err := pool.Get(ctx)
203 assert.NotNil(t, c)
204 assert.Nil(t, err)
205 // Inform the waiting loop that a client has been allocated
206 ch <- struct{}{}
207 // Keep the client for 5s and then return it to the pool
208 time.Sleep(5 * time.Second)
209 pool.Put(c)
210 wg.Done()
211 }()
212 }
213
214 // Wait until all clients are allocated
215 allocated := 0
216 for range ch {
217 allocated++
218 if allocated == totalSize {
219 break
220 }
221 }
222 // Try to get above capacity/usage with longer timeout
223 wg.Add(1)
224 go func() {
225 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
226 c, err := pool.Get(ctx)
227 assert.NotNil(t, err)
228 expected := err.Error() == "pool-is-closing" || err.Error() == "stop-waiting-pool-is-closing"
229 assert.True(t, expected)
230 assert.Nil(t, c)
231 cancel()
232 wg.Done()
233 }()
234
235 // Invoke close on the pool
236 wg.Add(1)
237 go func() {
238 pool.Close(context.Background())
239 wg.Done()
240 }()
241
242 // Try to get new client and ensure we can't get one
243 c, err := pool.Get(context.Background())
244 assert.NotNil(t, err)
245 expected := err.Error() == "pool-is-closing" || err.Error() == "stop-waiting-pool-is-closing"
246 assert.True(t, expected)
247 assert.Nil(t, c)
248
249 wg.Wait()
250}