blob: 8114dd0eed1d186c05f5e53c0f8f9ee04d8019d1 [file] [log] [blame]
khenaidoo2bc48282019-07-16 18:13:46 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Kent Hagerman2b216042020-04-03 18:28:56 -040016package device
khenaidoo2bc48282019-07-16 18:13:46 -040017
18import (
khenaidoo6e55d9e2019-12-12 18:26:26 -050019 "context"
David Bainbridged1afd662020-03-26 18:27:41 -070020 "math/rand"
Neha Sharmad1387da2020-05-07 20:07:28 +000021 "strconv"
David Bainbridged1afd662020-03-26 18:27:41 -070022 "sync"
23 "testing"
24 "time"
25
Kent Hagermanf5a67352020-04-30 15:15:26 -040026 "github.com/gogo/protobuf/proto"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070027 "github.com/opencord/voltha-go/db/model"
Kent Hagermanf5a67352020-04-30 15:15:26 -040028 "github.com/opencord/voltha-go/rw_core/config"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070029 "github.com/opencord/voltha-go/rw_core/core/adapter"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030 tst "github.com/opencord/voltha-go/rw_core/test"
yasin sapli5458a1c2021-06-14 22:24:38 +000031 com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
32 "github.com/opencord/voltha-lib-go/v5/pkg/db"
33 "github.com/opencord/voltha-lib-go/v5/pkg/events"
34 fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
35 "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
36 mock_etcd "github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd"
37 mock_kafka "github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka"
Maninderdfadc982020-10-28 14:04:33 +053038 ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
39 "github.com/opencord/voltha-protos/v4/go/voltha"
khenaidoo6e55d9e2019-12-12 18:26:26 -050040 "github.com/phayes/freeport"
khenaidoo2bc48282019-07-16 18:13:46 -040041 "github.com/stretchr/testify/assert"
khenaidoo2bc48282019-07-16 18:13:46 -040042)
43
khenaidoo6e55d9e2019-12-12 18:26:26 -050044type LDATest struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040045 etcdServer *mock_etcd.EtcdServer
46 deviceMgr *Manager
47 kmp kafka.InterContainerProxy
48 logicalDeviceMgr *LogicalManager
49 kClient kafka.Client
Himani Chawlab4c25912020-11-12 17:16:38 +053050 kEventClient kafka.Client
Kent Hagerman2b216042020-04-03 18:28:56 -040051 kvClientPort int
52 oltAdapterName string
53 onuAdapterName string
54 coreInstanceID string
55 defaultTimeout time.Duration
56 maxTimeout time.Duration
57 logicalDevice *voltha.LogicalDevice
Kent Hagerman2a07b862020-06-19 15:23:07 -040058 logicalPorts map[uint32]*voltha.LogicalPort
Kent Hagerman2b216042020-04-03 18:28:56 -040059 deviceIds []string
60 done chan int
khenaidoo6e55d9e2019-12-12 18:26:26 -050061}
62
Rohan Agrawal31f21802020-06-12 05:38:46 +000063func newLDATest(ctx context.Context) *LDATest {
khenaidoo6e55d9e2019-12-12 18:26:26 -050064 test := &LDATest{}
65 // Start the embedded etcd server
66 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +000067 test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer(ctx, "voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
khenaidoo6e55d9e2019-12-12 18:26:26 -050068 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +000069 logger.Fatal(ctx, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -050070 }
71 // Create the kafka client
Matteo Scandolod525ae32020-04-02 17:27:29 -070072 test.kClient = mock_kafka.NewKafkaClient()
Himani Chawlab4c25912020-11-12 17:16:38 +053073 test.kEventClient = mock_kafka.NewKafkaClient()
khenaidoo6e55d9e2019-12-12 18:26:26 -050074 test.oltAdapterName = "olt_adapter_mock"
75 test.onuAdapterName = "onu_adapter_mock"
76 test.coreInstanceID = "rw-da-test"
77 test.defaultTimeout = 5 * time.Second
78 test.maxTimeout = 20 * time.Second
79 test.done = make(chan int)
80 test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
81 test.logicalDevice = &voltha.LogicalDevice{
82 Desc: &ofp.OfpDesc{
83 HwDesc: "olt_adapter_mock",
84 SwDesc: "olt_adapter_mock",
85 SerialNum: com.GetRandomSerialNumber(),
86 },
87 SwitchFeatures: &ofp.OfpSwitchFeatures{
88 NBuffers: 256,
89 NTables: 2,
90 Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
91 ofp.OfpCapabilities_OFPC_TABLE_STATS |
92 ofp.OfpCapabilities_OFPC_PORT_STATS |
93 ofp.OfpCapabilities_OFPC_GROUP_STATS),
94 },
95 RootDeviceId: test.deviceIds[0],
Kent Hagerman2a07b862020-06-19 15:23:07 -040096 }
97 test.logicalPorts = map[uint32]*voltha.LogicalPort{
98 1: {
99 Id: "1001",
100 DeviceId: test.deviceIds[0],
101 DevicePortNo: 1,
102 RootPort: true,
103 OfpPort: &ofp.OfpPort{
104 PortNo: 1,
105 Name: "port1",
106 Config: 4,
107 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500108 },
Kent Hagerman2a07b862020-06-19 15:23:07 -0400109 },
110 2: {
111 Id: "1002",
112 DeviceId: test.deviceIds[1],
113 DevicePortNo: 2,
114 RootPort: false,
115 OfpPort: &ofp.OfpPort{
116 PortNo: 2,
117 Name: "port2",
118 Config: 4,
119 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500120 },
Kent Hagerman2a07b862020-06-19 15:23:07 -0400121 },
122 3: {
123 Id: "1003",
124 DeviceId: test.deviceIds[2],
125 DevicePortNo: 3,
126 RootPort: false,
127 OfpPort: &ofp.OfpPort{
128 PortNo: 3,
129 Name: "port3",
130 Config: 4,
131 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500132 },
133 },
134 }
135 return test
136}
137
Rohan Agrawal31f21802020-06-12 05:38:46 +0000138func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
David K. Bainbridge6080c172021-07-24 00:22:28 +0000139 cfg := &config.RWCoreFlags{}
140 cfg.ParseCommandArguments([]string{})
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700141 cfg.CoreTopic = "rw_core"
Himani Chawlab4c25912020-11-12 17:16:38 +0530142 cfg.EventTopic = "voltha.events"
khenaidoo442e7c72020-03-10 16:13:48 -0400143 cfg.DefaultRequestTimeout = lda.defaultTimeout
Neha Sharmad1387da2020-05-07 20:07:28 +0000144 cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500145 grpcPort, err := freeport.GetFreePort()
146 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000147 logger.Fatal(ctx, "Cannot get a freeport for grpc")
khenaidoo6e55d9e2019-12-12 18:26:26 -0500148 }
Neha Sharmad1387da2020-05-07 20:07:28 +0000149 cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000150 client := tst.SetupKVClient(ctx, cfg, lda.coreInstanceID)
Kent Hagerman2b216042020-04-03 18:28:56 -0400151 backend := &db.Backend{
152 Client: client,
153 StoreType: cfg.KVStoreType,
Neha Sharmad1387da2020-05-07 20:07:28 +0000154 Address: cfg.KVStoreAddress,
Kent Hagerman2b216042020-04-03 18:28:56 -0400155 Timeout: cfg.KVStoreTimeout,
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700156 LivenessChannelInterval: cfg.LiveProbeInterval / 2}
Kent Hagerman2b216042020-04-03 18:28:56 -0400157 lda.kmp = kafka.NewInterContainerProxy(
Neha Sharmad1387da2020-05-07 20:07:28 +0000158 kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
Kent Hagerman2b216042020-04-03 18:28:56 -0400159 kafka.MsgClient(lda.kClient),
David Bainbridge9ae13132020-06-22 17:28:01 -0700160 kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
Kent Hagerman2b216042020-04-03 18:28:56 -0400161
162 endpointMgr := kafka.NewEndpointManager(backend)
Kent Hagermanf5a67352020-04-30 15:15:26 -0400163 proxy := model.NewDBPath(backend)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000164 adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
Himani Chawlab4c25912020-11-12 17:16:38 +0530165 eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
Maninder0aabf0c2021-03-17 14:55:14 +0530166 lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg, lda.coreInstanceID, eventProxy)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000167 if err = lda.kmp.Start(ctx); err != nil {
168 logger.Fatal(ctx, "Cannot start InterContainerProxy")
Thomas Lee Se5a44012019-11-07 20:32:24 +0530169 }
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400170 adapterMgr.Start(context.Background())
khenaidoo6e55d9e2019-12-12 18:26:26 -0500171}
172
Rohan Agrawal31f21802020-06-12 05:38:46 +0000173func (lda *LDATest) stopAll(ctx context.Context) {
khenaidoo6e55d9e2019-12-12 18:26:26 -0500174 if lda.kClient != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000175 lda.kClient.Stop(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500176 }
Kent Hagerman2b216042020-04-03 18:28:56 -0400177 if lda.kmp != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000178 lda.kmp.Stop(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500179 }
180 if lda.etcdServer != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000181 tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500182 }
Himani Chawlab4c25912020-11-12 17:16:38 +0530183 if lda.kEventClient != nil {
184 lda.kEventClient.Stop(ctx)
185 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500186}
187
Kent Hagerman2b216042020-04-03 18:28:56 -0400188func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
189 lDeviceMgr := lda.logicalDeviceMgr
190 deviceMgr := lda.deviceMgr
khenaidoo6e55d9e2019-12-12 18:26:26 -0500191 clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
192 clonedLD.Id = com.GetRandomString(10)
193 clonedLD.DatapathId = rand.Uint64()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000194 lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500195 lDeviceAgent.logicalDevice = clonedLD
Kent Hagerman2a07b862020-06-19 15:23:07 -0400196 for _, port := range lda.logicalPorts {
197 clonedPort := proto.Clone(port).(*voltha.LogicalPort)
198 handle, created, err := lDeviceAgent.portLoader.LockOrCreate(context.Background(), clonedPort)
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400199 if err != nil {
200 panic(err)
201 }
202 handle.Unlock()
203 if !created {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400204 t.Errorf("port %d already exists", clonedPort.OfpPort.PortNo)
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400205 }
206 }
Kent Hagermanf5a67352020-04-30 15:15:26 -0400207 err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
Thomas Lee Se5a44012019-11-07 20:32:24 +0530208 assert.Nil(t, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500209 lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
210 return lDeviceAgent
211}
212
Kent Hagerman2b216042020-04-03 18:28:56 -0400213func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalAgent, globalWG *sync.WaitGroup) {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400214 originalLogicalPorts := ldAgent.listLogicalDevicePorts(context.Background())
215 assert.NotNil(t, originalLogicalPorts)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500216 var localWG sync.WaitGroup
217
218 // Change the state of the first port to FAILED
219 localWG.Add(1)
220 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400221 err := ldAgent.updatePortState(context.Background(), 1, voltha.OperStatus_FAILED)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500222 assert.Nil(t, err)
223 localWG.Done()
224 }()
225
226 // Change the state of the second port to TESTING
227 localWG.Add(1)
228 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400229 err := ldAgent.updatePortState(context.Background(), 2, voltha.OperStatus_TESTING)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500230 assert.Nil(t, err)
231 localWG.Done()
232 }()
233
234 // Change the state of the third port to UNKNOWN and then back to ACTIVE
235 localWG.Add(1)
236 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400237 err := ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_UNKNOWN)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500238 assert.Nil(t, err)
Kent Hagerman2a07b862020-06-19 15:23:07 -0400239 err = ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_ACTIVE)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500240 assert.Nil(t, err)
241 localWG.Done()
242 }()
243
244 // Add a meter to the logical device
245 meterMod := &ofp.OfpMeterMod{
246 Command: ofp.OfpMeterModCommand_OFPMC_ADD,
247 Flags: rand.Uint32(),
248 MeterId: rand.Uint32(),
249 Bands: []*ofp.OfpMeterBandHeader{
250 {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
251 Rate: rand.Uint32(),
252 BurstSize: rand.Uint32(),
253 Data: nil,
254 },
255 },
256 }
257 localWG.Add(1)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000258 ctx := context.Background()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500259 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000260 err := ldAgent.meterAdd(ctx, meterMod)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500261 assert.Nil(t, err)
262 localWG.Done()
263 }()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500264 // wait for go routines to be done
265 localWG.Wait()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000266 meterEntry := fu.MeterEntryFromMeterMod(ctx, meterMod)
Mahir Gunyeladdb66a2020-04-29 18:08:50 -0700267
khenaidoo1a0d6222021-06-30 16:48:44 -0400268 meterHandle, have := ldAgent.meterLoader.Lock(meterMod.MeterId)
Kent Hagerman433a31a2020-05-20 19:04:48 -0400269 assert.Equal(t, have, true)
270 if have {
271 assert.True(t, proto.Equal(meterEntry, meterHandle.GetReadOnly()))
272 meterHandle.Unlock()
273 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500274
Kent Hagerman2a07b862020-06-19 15:23:07 -0400275 expectedLogicalPorts := make(map[uint32]*voltha.LogicalPort)
276 for _, port := range originalLogicalPorts {
277 clonedPort := proto.Clone(port).(*voltha.LogicalPort)
278 switch clonedPort.OfpPort.PortNo {
279 case 1:
280 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
281 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
282 case 2:
283 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
284 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
285 case 3:
286 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
287 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
288 }
289 expectedLogicalPorts[clonedPort.OfpPort.PortNo] = clonedPort
290 }
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400291
Kent Hagerman2a07b862020-06-19 15:23:07 -0400292 updatedLogicalDevicePorts := ldAgent.listLogicalDevicePorts(ctx)
293 assert.Equal(t, len(expectedLogicalPorts), len(updatedLogicalDevicePorts))
294 for _, p := range updatedLogicalDevicePorts {
295 assert.True(t, proto.Equal(p, expectedLogicalPorts[p.OfpPort.PortNo]))
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400296 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500297 globalWG.Done()
298}
299
Himani Chawla40af2702021-01-27 15:06:30 +0530300func (lda *LDATest) stopLogicalAgentAndCheckEventQueueIsEmpty(ctx context.Context, t *testing.T, ldAgent *LogicalAgent) {
301 queueIsEmpty := false
302 err := ldAgent.stop(ctx)
303 assert.Nil(t, err)
304 qp := ldAgent.orderedEvents.assignQueuePosition()
305 if qp.prev != nil { // we will be definitely hitting this case as we pushed events on the queue before
306 // If previous channel is closed which it should be now,
307 // only then we can know that queue is empty.
308 _, ok := <-qp.prev
309 if !ok {
310 queueIsEmpty = true
311 } else {
312 queueIsEmpty = false
313 }
314 } else {
315 queueIsEmpty = true
316 }
317 close(qp.next)
318 assert.True(t, queueIsEmpty)
319}
320
321func (lda *LDATest) updateLogicalDevice(t *testing.T, ldAgent *LogicalAgent) {
322 originalLogicalPorts := ldAgent.listLogicalDevicePorts(context.Background())
323 assert.NotNil(t, originalLogicalPorts)
324
325 // Change the state of the first port to FAILED
326 err := ldAgent.updatePortState(context.Background(), 1, voltha.OperStatus_FAILED)
327 assert.Nil(t, err)
328
329 // Change the state of the second port to TESTING
330 err = ldAgent.updatePortState(context.Background(), 2, voltha.OperStatus_TESTING)
331 assert.Nil(t, err)
332
333 // Change the state of the third port to ACTIVE
334 err = ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_ACTIVE)
335 assert.Nil(t, err)
336
337}
338
khenaidoo6e55d9e2019-12-12 18:26:26 -0500339func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000340 ctx := context.Background()
341 lda := newLDATest(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500342 assert.NotNil(t, lda)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000343 defer lda.stopAll(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500344
345 // Start the Core
Rohan Agrawal31f21802020-06-12 05:38:46 +0000346 lda.startCore(ctx, false)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500347
348 var wg sync.WaitGroup
khenaidoo442e7c72020-03-10 16:13:48 -0400349 numConCurrentLogicalDeviceAgents := 3
khenaidoo6e55d9e2019-12-12 18:26:26 -0500350 for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
351 wg.Add(1)
352 a := lda.createLogicalDeviceAgent(t)
353 go lda.updateLogicalDeviceConcurrently(t, a, &wg)
354 }
355
356 wg.Wait()
357}
Himani Chawla40af2702021-01-27 15:06:30 +0530358
359func TestLogicalAgentStopWithEventsInQueue(t *testing.T) {
360 ctx := context.Background()
361 lda := newLDATest(ctx)
362 assert.NotNil(t, lda)
363 defer lda.stopAll(ctx)
364
365 // Start the Core
366 lda.startCore(ctx, false)
367
368 a := lda.createLogicalDeviceAgent(t)
369 lda.updateLogicalDevice(t, a)
370 lda.stopLogicalAgentAndCheckEventQueueIsEmpty(ctx, t, a)
371}