blob: a45b3ee0a11be690318cfa033779714778b6cd12 [file] [log] [blame]
khenaidoob6238b32020-04-07 12:07:36 -04001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "fmt"
21 "github.com/golang/protobuf/proto"
22 "github.com/google/uuid"
Girish Gowdra4c60c672021-07-26 13:30:57 -070023 "github.com/opencord/voltha-lib-go/v6/pkg/db"
24 "github.com/opencord/voltha-lib-go/v6/pkg/log"
25 "github.com/opencord/voltha-lib-go/v6/pkg/mocks/etcd"
Girish Gowdra89c985b2020-10-14 15:02:09 -070026 "github.com/opencord/voltha-protos/v4/go/voltha"
khenaidoob6238b32020-04-07 12:07:36 -040027 "github.com/phayes/freeport"
28 "github.com/stretchr/testify/assert"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31 "math"
Neha Sharmadd9af392020-04-28 09:03:57 +000032 "strconv"
khenaidoob6238b32020-04-07 12:07:36 -040033 "testing"
34 "time"
35)
36
37type EPTest struct {
38 etcdServer *etcd.EtcdServer
39 backend *db.Backend
40 maxReplicas int
41 minReplicas int
42}
43
44func newEPTest(minReplicas, maxReplicas int) *EPTest {
Neha Sharma94f16a92020-06-26 04:17:55 +000045 ctx := context.Background()
khenaidoob6238b32020-04-07 12:07:36 -040046 test := &EPTest{
47 minReplicas: minReplicas,
48 maxReplicas: maxReplicas,
49 }
50
51 // Create backend
52 if err := test.initBackend(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +000053 logger.Fatalw(ctx, "setting-backend-failed", log.Fields{"error": err})
khenaidoob6238b32020-04-07 12:07:36 -040054 }
55
56 // Populate backend with data
57 if err := test.populateBackend(); err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +000058 logger.Fatalw(ctx, "populating-db-failed", log.Fields{"error": err})
khenaidoob6238b32020-04-07 12:07:36 -040059 }
60 return test
61}
62
63func (ep *EPTest) initBackend() error {
Neha Sharma94f16a92020-06-26 04:17:55 +000064 ctx := context.Background()
khenaidoob6238b32020-04-07 12:07:36 -040065 configName := "voltha-lib.kafka.ep.test"
66 storageDir := "voltha-lib.kafka.ep.etcd"
67 logLevel := "error"
Neha Sharma130ac6d2020-04-08 08:46:32 +000068 timeout := 5 * time.Second
khenaidoob6238b32020-04-07 12:07:36 -040069
70 kvClientPort, err := freeport.GetFreePort()
71 if err != nil {
72 return err
73 }
74 peerPort, err := freeport.GetFreePort()
75 if err != nil {
76 return err
77 }
Neha Sharma94f16a92020-06-26 04:17:55 +000078 ep.etcdServer = etcd.StartEtcdServer(ctx, etcd.MKConfig(ctx, configName, kvClientPort, peerPort, storageDir, logLevel))
khenaidoob6238b32020-04-07 12:07:36 -040079 if ep.etcdServer == nil {
80 return status.Error(codes.Internal, "Embedded server failed to start")
81 }
82
Neha Sharma94f16a92020-06-26 04:17:55 +000083 ep.backend = db.NewBackend(ctx, "etcd", "127.0.0.1"+":"+strconv.Itoa(kvClientPort), timeout, "service/voltha")
khenaidoob6238b32020-04-07 12:07:36 -040084 return nil
85}
86
87func (ep *EPTest) stopAll() {
88 if ep.etcdServer != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +000089 ep.etcdServer.Stop(context.Background())
khenaidoob6238b32020-04-07 12:07:36 -040090 }
91}
92
93func (ep *EPTest) populateBackend() error {
94 // Add an adapter with multiple replicas
95 adapterPrefix := "adapter_brcm_openomci_onu"
96 numReplicas := ep.maxReplicas
97 for i := 0; i < numReplicas; i++ {
98 adapter := &voltha.Adapter{
99 Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
100 Vendor: "VOLTHA OpenONU",
101 Version: "2.4.0-dev0",
102 Type: adapterPrefix,
103 CurrentReplica: int32(i),
104 TotalReplicas: int32(numReplicas),
105 Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
106 }
107 adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
108 blob, err := proto.Marshal(adapter)
109 if err != nil {
110 return err
111 }
112 if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
113 return err
114 }
115 }
116
117 // Add an adapter with minreplicas
118 adapterPrefix = "adapter_openolt"
119 numReplicas = ep.minReplicas
120 for i := 0; i < numReplicas; i++ {
121 adapter := &voltha.Adapter{
122 Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
123 Vendor: "VOLTHA OpenOLT",
124 Version: "2.3.1-dev",
125 Type: adapterPrefix,
126 CurrentReplica: int32(i),
127 TotalReplicas: int32(numReplicas),
128 Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
129 }
130 adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
131 blob, err := proto.Marshal(adapter)
132 if err != nil {
133 return err
134 }
135 if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
136 return err
137 }
138 }
139
140 // Add the brcm_openomci_onu device type
141 dType := "brcm_openomci_onu"
142 adapterName := "adapter_brcm_openomci_onu"
143 deviceType := &voltha.DeviceType{
144 Id: dType,
145 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "ARPX", "DACM", "ERSN", "HWTC", "CIGG"},
146 Adapter: adapterName,
147 AcceptsAddRemoveFlowUpdates: true,
148 }
149 blob, err := proto.Marshal(deviceType)
150 if err != nil {
151 return err
152 }
153 if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
154 return err
155 }
156
157 // Add the openolt device type
158 dType = "openolt"
159 adapterName = "adapter_openolt"
160 deviceType = &voltha.DeviceType{
161 Id: dType,
162 Adapter: adapterName,
163 AcceptsAddRemoveFlowUpdates: true,
164 }
165 blob, err = proto.Marshal(deviceType)
166 if err != nil {
167 return err
168 }
169 if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
170 return err
171 }
172 return nil
173}
174
175func getMeanAndStdDeviation(val []int, replicas int) (float64, float64) {
176 var sum, mean, sd float64
177 for i := 0; i < replicas; i++ {
178 sum += float64(val[i])
179 }
180 mean = sum / float64(replicas)
181
182 for j := 0; j < replicas; j++ {
183 sd += math.Pow(float64(val[j])-mean, 2)
184 }
185 sd = math.Sqrt(sd / float64(replicas))
186 return mean, sd
187}
188
189func (ep *EPTest) testEndpointManagerAPIs(t *testing.T, tm EndpointManager, serviceType string, deviceType string, replicas int) {
Neha Sharma94f16a92020-06-26 04:17:55 +0000190 ctx := context.Background()
khenaidoob6238b32020-04-07 12:07:36 -0400191 // Map of device ids to topic
192 deviceIDs := make(map[string]Endpoint)
193 numDevices := 1000
194 total := make([]int, replicas)
195 for i := 0; i < numDevices; i++ {
196 deviceID := uuid.New().String()
Neha Sharma94f16a92020-06-26 04:17:55 +0000197 endpoint, err := tm.GetEndpoint(ctx, deviceID, serviceType)
khenaidoob6238b32020-04-07 12:07:36 -0400198 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000199 logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
khenaidoob6238b32020-04-07 12:07:36 -0400200 }
201 deviceIDs[deviceID] = endpoint
Neha Sharma94f16a92020-06-26 04:17:55 +0000202 replicaID, err := tm.GetReplicaAssignment(ctx, deviceID, serviceType)
khenaidoob6238b32020-04-07 12:07:36 -0400203 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000204 logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
khenaidoob6238b32020-04-07 12:07:36 -0400205 }
206 total[replicaID] += 1
207 }
208
209 mean, sdtDev := getMeanAndStdDeviation(total, replicas)
210 fmt.Println(fmt.Sprintf("Device distributions => devices:%d service_replicas:%d mean:%d standard_deviation:%d, distributions:%v", numDevices, replicas, int(mean), int(sdtDev), total))
211
212 // Verify that we get the same topic for a given device ID, irrespective of the number of iterations
213 numIterations := 10
214 for i := 0; i < numIterations; i++ {
215 for deviceID, expectedEndpoint := range deviceIDs {
Neha Sharma94f16a92020-06-26 04:17:55 +0000216 endpointByServiceType, err := tm.GetEndpoint(ctx, deviceID, serviceType)
khenaidoob6238b32020-04-07 12:07:36 -0400217 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000218 logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
khenaidoob6238b32020-04-07 12:07:36 -0400219 }
220 assert.Equal(t, expectedEndpoint, endpointByServiceType)
221 }
222 }
223
224 // Verify that a device belong to the correct node
225 for deviceID := range deviceIDs {
Neha Sharma94f16a92020-06-26 04:17:55 +0000226 replicaID, err := tm.GetReplicaAssignment(ctx, deviceID, serviceType)
khenaidoob6238b32020-04-07 12:07:36 -0400227 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000228 logger.Fatalw(ctx, "error-getting-topic", log.Fields{"error": err})
khenaidoob6238b32020-04-07 12:07:36 -0400229 }
230 for k := 0; k < replicas; k++ {
Neha Sharma94f16a92020-06-26 04:17:55 +0000231 owned, err := tm.IsDeviceOwnedByService(ctx, deviceID, serviceType, int32(k))
khenaidoob6238b32020-04-07 12:07:36 -0400232 if err != nil {
Neha Sharma94f16a92020-06-26 04:17:55 +0000233 logger.Fatalw(ctx, "error-verifying-device-ownership", log.Fields{"error": err})
khenaidoob6238b32020-04-07 12:07:36 -0400234 }
235 assert.Equal(t, ReplicaID(k) == replicaID, owned)
236 }
237 }
238}
239
240func TestEndpointManagerSuite(t *testing.T) {
241 tmt := newEPTest(1, 10)
242 assert.NotNil(t, tmt)
243
244 tm := NewEndpointManager(
245 tmt.backend,
246 PartitionCount(1117),
247 ReplicationFactor(200),
248 Load(1.1))
249
250 defer tmt.stopAll()
251
252 //1. Test APIs with multiple replicas
253 tmt.testEndpointManagerAPIs(t, tm, "adapter_brcm_openomci_onu", "brcm_openomci_onu", tmt.maxReplicas)
254
255 //2. Test APIs with single replica
256 tmt.testEndpointManagerAPIs(t, tm, "adapter_openolt", "openolt", tmt.minReplicas)
257}