blob: 37902210fe1a704ce64052a385b67675821f7f8d [file] [log] [blame]
khenaidoob6238b32020-04-07 12:07:36 -04001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "fmt"
21 "github.com/golang/protobuf/proto"
22 "github.com/google/uuid"
23 "github.com/opencord/voltha-lib-go/v3/pkg/db"
24 "github.com/opencord/voltha-lib-go/v3/pkg/log"
25 "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
26 "github.com/opencord/voltha-protos/v3/go/voltha"
27 "github.com/phayes/freeport"
28 "github.com/stretchr/testify/assert"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31 "math"
Neha Sharmadd9af392020-04-28 09:03:57 +000032 "strconv"
khenaidoob6238b32020-04-07 12:07:36 -040033 "testing"
34 "time"
35)
36
37type EPTest struct {
38 etcdServer *etcd.EtcdServer
39 backend *db.Backend
40 maxReplicas int
41 minReplicas int
42}
43
44func newEPTest(minReplicas, maxReplicas int) *EPTest {
45 test := &EPTest{
46 minReplicas: minReplicas,
47 maxReplicas: maxReplicas,
48 }
49
50 // Create backend
51 if err := test.initBackend(); err != nil {
52 logger.Fatalw("setting-backend-failed", log.Fields{"error": err})
53 }
54
55 // Populate backend with data
56 if err := test.populateBackend(); err != nil {
57 logger.Fatalw("populating-db-failed", log.Fields{"error": err})
58 }
59 return test
60}
61
62func (ep *EPTest) initBackend() error {
63 configName := "voltha-lib.kafka.ep.test"
64 storageDir := "voltha-lib.kafka.ep.etcd"
65 logLevel := "error"
Neha Sharma130ac6d2020-04-08 08:46:32 +000066 timeout := 5 * time.Second
khenaidoob6238b32020-04-07 12:07:36 -040067
68 kvClientPort, err := freeport.GetFreePort()
69 if err != nil {
70 return err
71 }
72 peerPort, err := freeport.GetFreePort()
73 if err != nil {
74 return err
75 }
76 ep.etcdServer = etcd.StartEtcdServer(etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
77 if ep.etcdServer == nil {
78 return status.Error(codes.Internal, "Embedded server failed to start")
79 }
80
Neha Sharmadd9af392020-04-28 09:03:57 +000081 ep.backend = db.NewBackend("etcd", "127.0.0.1"+":"+strconv.Itoa(kvClientPort), timeout, "service/voltha")
khenaidoob6238b32020-04-07 12:07:36 -040082 return nil
83}
84
85func (ep *EPTest) stopAll() {
86 if ep.etcdServer != nil {
87 ep.etcdServer.Stop()
88 }
89}
90
91func (ep *EPTest) populateBackend() error {
92 // Add an adapter with multiple replicas
93 adapterPrefix := "adapter_brcm_openomci_onu"
94 numReplicas := ep.maxReplicas
95 for i := 0; i < numReplicas; i++ {
96 adapter := &voltha.Adapter{
97 Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
98 Vendor: "VOLTHA OpenONU",
99 Version: "2.4.0-dev0",
100 Type: adapterPrefix,
101 CurrentReplica: int32(i),
102 TotalReplicas: int32(numReplicas),
103 Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
104 }
105 adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
106 blob, err := proto.Marshal(adapter)
107 if err != nil {
108 return err
109 }
110 if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
111 return err
112 }
113 }
114
115 // Add an adapter with minreplicas
116 adapterPrefix = "adapter_openolt"
117 numReplicas = ep.minReplicas
118 for i := 0; i < numReplicas; i++ {
119 adapter := &voltha.Adapter{
120 Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
121 Vendor: "VOLTHA OpenOLT",
122 Version: "2.3.1-dev",
123 Type: adapterPrefix,
124 CurrentReplica: int32(i),
125 TotalReplicas: int32(numReplicas),
126 Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
127 }
128 adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
129 blob, err := proto.Marshal(adapter)
130 if err != nil {
131 return err
132 }
133 if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
134 return err
135 }
136 }
137
138 // Add the brcm_openomci_onu device type
139 dType := "brcm_openomci_onu"
140 adapterName := "adapter_brcm_openomci_onu"
141 deviceType := &voltha.DeviceType{
142 Id: dType,
143 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "ARPX", "DACM", "ERSN", "HWTC", "CIGG"},
144 Adapter: adapterName,
145 AcceptsAddRemoveFlowUpdates: true,
146 }
147 blob, err := proto.Marshal(deviceType)
148 if err != nil {
149 return err
150 }
151 if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
152 return err
153 }
154
155 // Add the openolt device type
156 dType = "openolt"
157 adapterName = "adapter_openolt"
158 deviceType = &voltha.DeviceType{
159 Id: dType,
160 Adapter: adapterName,
161 AcceptsAddRemoveFlowUpdates: true,
162 }
163 blob, err = proto.Marshal(deviceType)
164 if err != nil {
165 return err
166 }
167 if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
168 return err
169 }
170 return nil
171}
172
173func getMeanAndStdDeviation(val []int, replicas int) (float64, float64) {
174 var sum, mean, sd float64
175 for i := 0; i < replicas; i++ {
176 sum += float64(val[i])
177 }
178 mean = sum / float64(replicas)
179
180 for j := 0; j < replicas; j++ {
181 sd += math.Pow(float64(val[j])-mean, 2)
182 }
183 sd = math.Sqrt(sd / float64(replicas))
184 return mean, sd
185}
186
187func (ep *EPTest) testEndpointManagerAPIs(t *testing.T, tm EndpointManager, serviceType string, deviceType string, replicas int) {
188 // Map of device ids to topic
189 deviceIDs := make(map[string]Endpoint)
190 numDevices := 1000
191 total := make([]int, replicas)
192 for i := 0; i < numDevices; i++ {
193 deviceID := uuid.New().String()
194 endpoint, err := tm.GetEndpoint(deviceID, serviceType)
195 if err != nil {
196 logger.Fatalw("error-getting-endpoint", log.Fields{"error": err})
197 }
198 deviceIDs[deviceID] = endpoint
Matteo Scandolo87d71c02020-04-09 13:15:44 -0700199 replicaID, err := tm.GetReplicaAssignment(deviceID, serviceType)
khenaidoob6238b32020-04-07 12:07:36 -0400200 if err != nil {
201 logger.Fatalw("error-getting-endpoint", log.Fields{"error": err})
202 }
203 total[replicaID] += 1
204 }
205
206 mean, sdtDev := getMeanAndStdDeviation(total, replicas)
207 fmt.Println(fmt.Sprintf("Device distributions => devices:%d service_replicas:%d mean:%d standard_deviation:%d, distributions:%v", numDevices, replicas, int(mean), int(sdtDev), total))
208
209 // Verify that we get the same topic for a given device ID, irrespective of the number of iterations
210 numIterations := 10
211 for i := 0; i < numIterations; i++ {
212 for deviceID, expectedEndpoint := range deviceIDs {
213 endpointByServiceType, err := tm.GetEndpoint(deviceID, serviceType)
214 if err != nil {
215 logger.Fatalw("error-getting-endpoint", log.Fields{"error": err})
216 }
217 assert.Equal(t, expectedEndpoint, endpointByServiceType)
218 }
219 }
220
221 // Verify that a device belong to the correct node
222 for deviceID := range deviceIDs {
Matteo Scandolo87d71c02020-04-09 13:15:44 -0700223 replicaID, err := tm.GetReplicaAssignment(deviceID, serviceType)
khenaidoob6238b32020-04-07 12:07:36 -0400224 if err != nil {
225 logger.Fatalw("error-getting-topic", log.Fields{"error": err})
226 }
227 for k := 0; k < replicas; k++ {
228 owned, err := tm.IsDeviceOwnedByService(deviceID, serviceType, int32(k))
229 if err != nil {
230 logger.Fatalw("error-verifying-device-ownership", log.Fields{"error": err})
231 }
232 assert.Equal(t, ReplicaID(k) == replicaID, owned)
233 }
234 }
235}
236
237func TestEndpointManagerSuite(t *testing.T) {
238 tmt := newEPTest(1, 10)
239 assert.NotNil(t, tmt)
240
241 tm := NewEndpointManager(
242 tmt.backend,
243 PartitionCount(1117),
244 ReplicationFactor(200),
245 Load(1.1))
246
247 defer tmt.stopAll()
248
249 //1. Test APIs with multiple replicas
250 tmt.testEndpointManagerAPIs(t, tm, "adapter_brcm_openomci_onu", "brcm_openomci_onu", tmt.maxReplicas)
251
252 //2. Test APIs with single replica
253 tmt.testEndpointManagerAPIs(t, tm, "adapter_openolt", "openolt", tmt.minReplicas)
254}