blob: 0ed5a3db98a64319670fc0dbf66eacccc9dad516 [file] [log] [blame]
khenaidoob6238b32020-04-07 12:07:36 -04001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "fmt"
21 "github.com/golang/protobuf/proto"
22 "github.com/google/uuid"
23 "github.com/opencord/voltha-lib-go/v3/pkg/db"
24 "github.com/opencord/voltha-lib-go/v3/pkg/log"
25 "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
26 "github.com/opencord/voltha-protos/v3/go/voltha"
27 "github.com/phayes/freeport"
28 "github.com/stretchr/testify/assert"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31 "math"
32 "testing"
33 "time"
34)
35
36type EPTest struct {
37 etcdServer *etcd.EtcdServer
38 backend *db.Backend
39 maxReplicas int
40 minReplicas int
41}
42
43func newEPTest(minReplicas, maxReplicas int) *EPTest {
44 test := &EPTest{
45 minReplicas: minReplicas,
46 maxReplicas: maxReplicas,
47 }
48
49 // Create backend
50 if err := test.initBackend(); err != nil {
51 logger.Fatalw("setting-backend-failed", log.Fields{"error": err})
52 }
53
54 // Populate backend with data
55 if err := test.populateBackend(); err != nil {
56 logger.Fatalw("populating-db-failed", log.Fields{"error": err})
57 }
58 return test
59}
60
61func (ep *EPTest) initBackend() error {
62 configName := "voltha-lib.kafka.ep.test"
63 storageDir := "voltha-lib.kafka.ep.etcd"
64 logLevel := "error"
65 timeout := time.Duration(5 * time.Second)
66
67 kvClientPort, err := freeport.GetFreePort()
68 if err != nil {
69 return err
70 }
71 peerPort, err := freeport.GetFreePort()
72 if err != nil {
73 return err
74 }
75 ep.etcdServer = etcd.StartEtcdServer(etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
76 if ep.etcdServer == nil {
77 return status.Error(codes.Internal, "Embedded server failed to start")
78 }
79
80 ep.backend = db.NewBackend("etcd", "127.0.0.1", kvClientPort, int(timeout.Milliseconds()), "service/voltha")
81 return nil
82}
83
84func (ep *EPTest) stopAll() {
85 if ep.etcdServer != nil {
86 ep.etcdServer.Stop()
87 }
88}
89
90func (ep *EPTest) populateBackend() error {
91 // Add an adapter with multiple replicas
92 adapterPrefix := "adapter_brcm_openomci_onu"
93 numReplicas := ep.maxReplicas
94 for i := 0; i < numReplicas; i++ {
95 adapter := &voltha.Adapter{
96 Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
97 Vendor: "VOLTHA OpenONU",
98 Version: "2.4.0-dev0",
99 Type: adapterPrefix,
100 CurrentReplica: int32(i),
101 TotalReplicas: int32(numReplicas),
102 Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
103 }
104 adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
105 blob, err := proto.Marshal(adapter)
106 if err != nil {
107 return err
108 }
109 if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
110 return err
111 }
112 }
113
114 // Add an adapter with minreplicas
115 adapterPrefix = "adapter_openolt"
116 numReplicas = ep.minReplicas
117 for i := 0; i < numReplicas; i++ {
118 adapter := &voltha.Adapter{
119 Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
120 Vendor: "VOLTHA OpenOLT",
121 Version: "2.3.1-dev",
122 Type: adapterPrefix,
123 CurrentReplica: int32(i),
124 TotalReplicas: int32(numReplicas),
125 Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
126 }
127 adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
128 blob, err := proto.Marshal(adapter)
129 if err != nil {
130 return err
131 }
132 if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
133 return err
134 }
135 }
136
137 // Add the brcm_openomci_onu device type
138 dType := "brcm_openomci_onu"
139 adapterName := "adapter_brcm_openomci_onu"
140 deviceType := &voltha.DeviceType{
141 Id: dType,
142 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "ARPX", "DACM", "ERSN", "HWTC", "CIGG"},
143 Adapter: adapterName,
144 AcceptsAddRemoveFlowUpdates: true,
145 }
146 blob, err := proto.Marshal(deviceType)
147 if err != nil {
148 return err
149 }
150 if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
151 return err
152 }
153
154 // Add the openolt device type
155 dType = "openolt"
156 adapterName = "adapter_openolt"
157 deviceType = &voltha.DeviceType{
158 Id: dType,
159 Adapter: adapterName,
160 AcceptsAddRemoveFlowUpdates: true,
161 }
162 blob, err = proto.Marshal(deviceType)
163 if err != nil {
164 return err
165 }
166 if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
167 return err
168 }
169 return nil
170}
171
172func getMeanAndStdDeviation(val []int, replicas int) (float64, float64) {
173 var sum, mean, sd float64
174 for i := 0; i < replicas; i++ {
175 sum += float64(val[i])
176 }
177 mean = sum / float64(replicas)
178
179 for j := 0; j < replicas; j++ {
180 sd += math.Pow(float64(val[j])-mean, 2)
181 }
182 sd = math.Sqrt(sd / float64(replicas))
183 return mean, sd
184}
185
186func (ep *EPTest) testEndpointManagerAPIs(t *testing.T, tm EndpointManager, serviceType string, deviceType string, replicas int) {
187 // Map of device ids to topic
188 deviceIDs := make(map[string]Endpoint)
189 numDevices := 1000
190 total := make([]int, replicas)
191 for i := 0; i < numDevices; i++ {
192 deviceID := uuid.New().String()
193 endpoint, err := tm.GetEndpoint(deviceID, serviceType)
194 if err != nil {
195 logger.Fatalw("error-getting-endpoint", log.Fields{"error": err})
196 }
197 deviceIDs[deviceID] = endpoint
198 replicaID, err := tm.getReplicaAssignment(deviceID, serviceType)
199 if err != nil {
200 logger.Fatalw("error-getting-endpoint", log.Fields{"error": err})
201 }
202 total[replicaID] += 1
203 }
204
205 mean, sdtDev := getMeanAndStdDeviation(total, replicas)
206 fmt.Println(fmt.Sprintf("Device distributions => devices:%d service_replicas:%d mean:%d standard_deviation:%d, distributions:%v", numDevices, replicas, int(mean), int(sdtDev), total))
207
208 // Verify that we get the same topic for a given device ID, irrespective of the number of iterations
209 numIterations := 10
210 for i := 0; i < numIterations; i++ {
211 for deviceID, expectedEndpoint := range deviceIDs {
212 endpointByServiceType, err := tm.GetEndpoint(deviceID, serviceType)
213 if err != nil {
214 logger.Fatalw("error-getting-endpoint", log.Fields{"error": err})
215 }
216 assert.Equal(t, expectedEndpoint, endpointByServiceType)
217 }
218 }
219
220 // Verify that a device belong to the correct node
221 for deviceID := range deviceIDs {
222 replicaID, err := tm.getReplicaAssignment(deviceID, serviceType)
223 if err != nil {
224 logger.Fatalw("error-getting-topic", log.Fields{"error": err})
225 }
226 for k := 0; k < replicas; k++ {
227 owned, err := tm.IsDeviceOwnedByService(deviceID, serviceType, int32(k))
228 if err != nil {
229 logger.Fatalw("error-verifying-device-ownership", log.Fields{"error": err})
230 }
231 assert.Equal(t, ReplicaID(k) == replicaID, owned)
232 }
233 }
234}
235
236func TestEndpointManagerSuite(t *testing.T) {
237 tmt := newEPTest(1, 10)
238 assert.NotNil(t, tmt)
239
240 tm := NewEndpointManager(
241 tmt.backend,
242 PartitionCount(1117),
243 ReplicationFactor(200),
244 Load(1.1))
245
246 defer tmt.stopAll()
247
248 //1. Test APIs with multiple replicas
249 tmt.testEndpointManagerAPIs(t, tm, "adapter_brcm_openomci_onu", "brcm_openomci_onu", tmt.maxReplicas)
250
251 //2. Test APIs with single replica
252 tmt.testEndpointManagerAPIs(t, tm, "adapter_openolt", "openolt", tmt.minReplicas)
253}