blob: 1ff6e193cce80c08022b8f88d0ceccb782a05e3e [file] [log] [blame]
khenaidoo6f415b22021-06-22 18:08:53 -04001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kvstore
17
18import (
19 "context"
khenaidoo6f415b22021-06-22 18:08:53 -040020 "os"
21 "strconv"
22 "sync"
23 "testing"
24 "time"
khenaidoo26721882021-08-11 17:42:52 -040025
26 "github.com/opencord/voltha-lib-go/v7/pkg/log"
27 mocks "github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
28 "github.com/phayes/freeport"
29 "github.com/stretchr/testify/assert"
khenaidoo6f415b22021-06-22 18:08:53 -040030)
31
32const (
33 embedEtcdServerHost = "localhost"
34 defaultTimeout = 1 * time.Second
35)
36
37var (
38 embedEtcdServerPort int
39)
40
41func TestMain(m *testing.M) {
42 ctx := context.Background()
43 var err error
44 embedEtcdServerPort, err = freeport.GetFreePort()
45 if err != nil {
46 logger.Fatal(ctx, err)
47 }
48 peerPort, err := freeport.GetFreePort()
49 if err != nil {
50 logger.Fatal(ctx, err)
51 }
52 etcdServer := mocks.StartEtcdServer(ctx, mocks.MKConfig(ctx,
53 "voltha.db.kvstore.test",
54 embedEtcdServerPort,
55 peerPort,
56 "voltha.lib.db.kvstore",
57 "error"))
58 res := m.Run()
59
60 etcdServer.Stop(ctx)
61 os.Exit(res)
62}
63
64func TestNewRoundRobinEtcdClientAllocator(t *testing.T) {
65 address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
66 capacity := 20
67 maxUsage := 10
68
69 pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
70 assert.NotNil(t, pool)
71 assert.Nil(t, err)
72}
73
74func TestRoundRobin_Get_Put(t *testing.T) {
75 address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
76 capacity := 20
77 maxUsage := 10
78
79 pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
80 assert.NotNil(t, pool)
81 assert.Nil(t, err)
82
83 // Verify we can obtain the expected number of clients with no errors or waiting time
84 var wg sync.WaitGroup
85 for i := 0; i < capacity*maxUsage; i++ {
86 wg.Add(1)
87 go func() {
88 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
89 defer cancel()
90 c, err := pool.Get(ctx)
91 assert.NotNil(t, c)
92 assert.Nil(t, err)
93 time.Sleep(5 * time.Second)
94 pool.Put(c)
95 wg.Done()
96 }()
97 }
98 wg.Wait()
99}
100
101func TestRoundRobin_Get_Put_various_capacity(t *testing.T) {
102 // Test single client with 1 concurrent usage
103 getPutVaryingCapacity(t, 1, 1)
104
105 // Test single client with multiple concurrent usage
106 getPutVaryingCapacity(t, 1, 100)
107
108 // Test multiple clients with single concurrent usage
109 getPutVaryingCapacity(t, 10, 1)
110
111 // Test multiple clients with multiple concurrent usage
112 getPutVaryingCapacity(t, 10, 10)
113}
114
115func getPutVaryingCapacity(t *testing.T, capacity, maxUsage int) {
116 address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
117
118 pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
119 assert.NotNil(t, pool)
120 assert.Nil(t, err)
121
122 // Verify we can obtain the expected number of clients with no errors or waiting time
123 var wg sync.WaitGroup
124 totalSize := capacity * maxUsage
125 ch := make(chan struct{})
126 for i := 0; i < totalSize; i++ {
127 wg.Add(1)
128 go func() {
129 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
130 defer cancel()
131 c, err := pool.Get(ctx)
132 assert.NotNil(t, c)
133 assert.Nil(t, err)
134 // Inform the waiting loop that a client has been allocated
135 ch <- struct{}{}
136 // Keep the client for 5s and then return it to the pool
137 time.Sleep(5 * time.Second)
138 pool.Put(c)
139 wg.Done()
140 }()
141 }
142
143 // Wait until all clients are allocated
144 allocated := 0
145 for range ch {
146 allocated++
147 if allocated == totalSize {
148 break
149 }
150 }
151
152 // Try to get above capacity/usage with low timeout
153 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
154 c, err := pool.Get(ctx)
155 assert.NotNil(t, err)
156 assert.Nil(t, c)
157 cancel()
158
159 // Try to get above capacity/usage with longer timeout
160 ctx, cancel = context.WithTimeout(context.Background(), 6*time.Second)
161 c, err = pool.Get(ctx)
162 assert.NotNil(t, c)
163 assert.Nil(t, err)
164 pool.Put(c)
165 cancel()
166
167 wg.Wait()
168
169 // Close the connection
170 pool.Close(context.Background())
171}
172
173func TestRoundRobin_Close_various_capacity(t *testing.T) {
174 // Test single client with 1 concurrent usage
175 closeWithVaryingCapacity(t, 1, 1)
176
177 // Test single client with multiple concurrent usage
178 closeWithVaryingCapacity(t, 1, 100)
179
180 // Test multiple clients with single concurrent usage
181 closeWithVaryingCapacity(t, 10, 1)
182
183 // Test multiple clients with multiple concurrent usage
184 closeWithVaryingCapacity(t, 10, 10)
185}
186
187func closeWithVaryingCapacity(t *testing.T, capacity, maxUsage int) {
188 address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
189
190 pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
191 assert.NotNil(t, pool)
192 assert.Nil(t, err)
193
194 // Verify we can obtain the expected number of clients with no errors or waiting time
195 var wg sync.WaitGroup
196 totalSize := capacity * maxUsage
197 ch := make(chan struct{})
198 for i := 0; i < totalSize; i++ {
199 wg.Add(1)
200 go func() {
201 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
202 defer cancel()
203 c, err := pool.Get(ctx)
204 assert.NotNil(t, c)
205 assert.Nil(t, err)
206 // Inform the waiting loop that a client has been allocated
207 ch <- struct{}{}
208 // Keep the client for 5s and then return it to the pool
209 time.Sleep(5 * time.Second)
210 pool.Put(c)
211 wg.Done()
212 }()
213 }
214
215 // Wait until all clients are allocated
216 allocated := 0
217 for range ch {
218 allocated++
219 if allocated == totalSize {
220 break
221 }
222 }
223 // Try to get above capacity/usage with longer timeout
224 wg.Add(1)
225 go func() {
226 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
227 c, err := pool.Get(ctx)
228 assert.NotNil(t, err)
229 expected := err.Error() == "pool-is-closing" || err.Error() == "stop-waiting-pool-is-closing"
230 assert.True(t, expected)
231 assert.Nil(t, c)
232 cancel()
233 wg.Done()
234 }()
235
236 // Invoke close on the pool
237 wg.Add(1)
238 go func() {
239 pool.Close(context.Background())
240 wg.Done()
241 }()
242
243 // Try to get new client and ensure we can't get one
244 c, err := pool.Get(context.Background())
245 assert.NotNil(t, err)
246 expected := err.Error() == "pool-is-closing" || err.Error() == "stop-waiting-pool-is-closing"
247 assert.True(t, expected)
248 assert.Nil(t, c)
249
250 wg.Wait()
251}