blob: 1b1dc5949a797c159f0e6af2ebf2d33aca7af3b0 [file] [log] [blame]
khenaidoo2bc48282019-07-16 18:13:46 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Kent Hagerman2b216042020-04-03 18:28:56 -040016package device
khenaidoo2bc48282019-07-16 18:13:46 -040017
18import (
khenaidoo6e55d9e2019-12-12 18:26:26 -050019 "context"
David Bainbridged1afd662020-03-26 18:27:41 -070020 "math/rand"
Neha Sharmad1387da2020-05-07 20:07:28 +000021 "strconv"
David Bainbridged1afd662020-03-26 18:27:41 -070022 "sync"
23 "testing"
24 "time"
25
Kent Hagermanf5a67352020-04-30 15:15:26 -040026 "github.com/gogo/protobuf/proto"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070027 "github.com/opencord/voltha-go/db/model"
Kent Hagermanf5a67352020-04-30 15:15:26 -040028 "github.com/opencord/voltha-go/rw_core/config"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070029 "github.com/opencord/voltha-go/rw_core/core/adapter"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030 tst "github.com/opencord/voltha-go/rw_core/test"
Kent Hagermanf5a67352020-04-30 15:15:26 -040031 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070032 "github.com/opencord/voltha-lib-go/v3/pkg/db"
33 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080034 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
Matteo Scandolod525ae32020-04-02 17:27:29 -070035 mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
36 mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080037 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
38 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoo6e55d9e2019-12-12 18:26:26 -050039 "github.com/phayes/freeport"
khenaidoo2bc48282019-07-16 18:13:46 -040040 "github.com/stretchr/testify/assert"
khenaidoo2bc48282019-07-16 18:13:46 -040041)
42
khenaidoo6e55d9e2019-12-12 18:26:26 -050043type LDATest struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040044 etcdServer *mock_etcd.EtcdServer
45 deviceMgr *Manager
46 kmp kafka.InterContainerProxy
47 logicalDeviceMgr *LogicalManager
48 kClient kafka.Client
49 kvClientPort int
50 oltAdapterName string
51 onuAdapterName string
52 coreInstanceID string
53 defaultTimeout time.Duration
54 maxTimeout time.Duration
55 logicalDevice *voltha.LogicalDevice
56 deviceIds []string
57 done chan int
khenaidoo6e55d9e2019-12-12 18:26:26 -050058}
59
60func newLDATest() *LDATest {
61 test := &LDATest{}
62 // Start the embedded etcd server
63 var err error
Mahir Gunyel03de0d32020-06-03 01:36:59 -070064 test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
khenaidoo6e55d9e2019-12-12 18:26:26 -050065 if err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +000066 logger.Fatal(err)
khenaidoo6e55d9e2019-12-12 18:26:26 -050067 }
68 // Create the kafka client
Matteo Scandolod525ae32020-04-02 17:27:29 -070069 test.kClient = mock_kafka.NewKafkaClient()
khenaidoo6e55d9e2019-12-12 18:26:26 -050070 test.oltAdapterName = "olt_adapter_mock"
71 test.onuAdapterName = "onu_adapter_mock"
72 test.coreInstanceID = "rw-da-test"
73 test.defaultTimeout = 5 * time.Second
74 test.maxTimeout = 20 * time.Second
75 test.done = make(chan int)
76 test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
77 test.logicalDevice = &voltha.LogicalDevice{
78 Desc: &ofp.OfpDesc{
79 HwDesc: "olt_adapter_mock",
80 SwDesc: "olt_adapter_mock",
81 SerialNum: com.GetRandomSerialNumber(),
82 },
83 SwitchFeatures: &ofp.OfpSwitchFeatures{
84 NBuffers: 256,
85 NTables: 2,
86 Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
87 ofp.OfpCapabilities_OFPC_TABLE_STATS |
88 ofp.OfpCapabilities_OFPC_PORT_STATS |
89 ofp.OfpCapabilities_OFPC_GROUP_STATS),
90 },
91 RootDeviceId: test.deviceIds[0],
92 Ports: []*voltha.LogicalPort{
93 {
94 Id: "1001",
95 DeviceId: test.deviceIds[0],
96 DevicePortNo: 1,
97 RootPort: true,
98 OfpPort: &ofp.OfpPort{
99 PortNo: 1,
100 Name: "port1",
101 Config: 4,
102 State: 4,
103 },
104 },
105 {
106 Id: "1002",
107 DeviceId: test.deviceIds[1],
108 DevicePortNo: 2,
109 RootPort: false,
110 OfpPort: &ofp.OfpPort{
111 PortNo: 2,
112 Name: "port2",
113 Config: 4,
114 State: 4,
115 },
116 },
117 {
118 Id: "1003",
119 DeviceId: test.deviceIds[2],
120 DevicePortNo: 3,
121 RootPort: false,
122 OfpPort: &ofp.OfpPort{
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400123 PortNo: 3,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500124 Name: "port3",
125 Config: 4,
126 State: 4,
127 },
128 },
129 },
130 }
131 return test
132}
133
134func (lda *LDATest) startCore(inCompeteMode bool) {
135 cfg := config.NewRWCoreFlags()
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700136 cfg.CoreTopic = "rw_core"
khenaidoo442e7c72020-03-10 16:13:48 -0400137 cfg.DefaultRequestTimeout = lda.defaultTimeout
Neha Sharmad1387da2020-05-07 20:07:28 +0000138 cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500139 grpcPort, err := freeport.GetFreePort()
140 if err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000141 logger.Fatal("Cannot get a freeport for grpc")
khenaidoo6e55d9e2019-12-12 18:26:26 -0500142 }
Neha Sharmad1387da2020-05-07 20:07:28 +0000143 cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
Mahir Gunyel03de0d32020-06-03 01:36:59 -0700144 client := tst.SetupKVClient(cfg, lda.coreInstanceID)
Kent Hagerman2b216042020-04-03 18:28:56 -0400145 backend := &db.Backend{
146 Client: client,
147 StoreType: cfg.KVStoreType,
Neha Sharmad1387da2020-05-07 20:07:28 +0000148 Address: cfg.KVStoreAddress,
Kent Hagerman2b216042020-04-03 18:28:56 -0400149 Timeout: cfg.KVStoreTimeout,
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700150 LivenessChannelInterval: cfg.LiveProbeInterval / 2}
Kent Hagerman2b216042020-04-03 18:28:56 -0400151 lda.kmp = kafka.NewInterContainerProxy(
Neha Sharmad1387da2020-05-07 20:07:28 +0000152 kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
Kent Hagerman2b216042020-04-03 18:28:56 -0400153 kafka.MsgClient(lda.kClient),
David Bainbridge9ae13132020-06-22 17:28:01 -0700154 kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
Kent Hagerman2b216042020-04-03 18:28:56 -0400155
156 endpointMgr := kafka.NewEndpointManager(backend)
Kent Hagermanf5a67352020-04-30 15:15:26 -0400157 proxy := model.NewDBPath(backend)
Kent Hagerman2b216042020-04-03 18:28:56 -0400158 adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
159
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700160 lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
Kent Hagerman2b216042020-04-03 18:28:56 -0400161 if err = lda.kmp.Start(); err != nil {
162 logger.Fatal("Cannot start InterContainerProxy")
Thomas Lee Se5a44012019-11-07 20:32:24 +0530163 }
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400164 adapterMgr.Start(context.Background())
khenaidoo6e55d9e2019-12-12 18:26:26 -0500165}
166
167func (lda *LDATest) stopAll() {
168 if lda.kClient != nil {
169 lda.kClient.Stop()
170 }
Kent Hagerman2b216042020-04-03 18:28:56 -0400171 if lda.kmp != nil {
172 lda.kmp.Stop()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500173 }
174 if lda.etcdServer != nil {
Mahir Gunyel03de0d32020-06-03 01:36:59 -0700175 tst.StopEmbeddedEtcdServer(lda.etcdServer)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500176 }
177}
178
Kent Hagerman2b216042020-04-03 18:28:56 -0400179func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
180 lDeviceMgr := lda.logicalDeviceMgr
181 deviceMgr := lda.deviceMgr
khenaidoo6e55d9e2019-12-12 18:26:26 -0500182 clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
183 clonedLD.Id = com.GetRandomString(10)
184 clonedLD.DatapathId = rand.Uint64()
Mahir Gunyel03de0d32020-06-03 01:36:59 -0700185 lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500186 lDeviceAgent.logicalDevice = clonedLD
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400187 for _, port := range clonedLD.Ports {
188 handle, created, err := lDeviceAgent.portLoader.LockOrCreate(context.Background(), port)
189 if err != nil {
190 panic(err)
191 }
192 handle.Unlock()
193 if !created {
194 t.Errorf("port %d already exists", port.OfpPort.PortNo)
195 }
196 }
Kent Hagermanf5a67352020-04-30 15:15:26 -0400197 err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
Thomas Lee Se5a44012019-11-07 20:32:24 +0530198 assert.Nil(t, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500199 lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
200 return lDeviceAgent
201}
202
Kent Hagerman2b216042020-04-03 18:28:56 -0400203func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalAgent, globalWG *sync.WaitGroup) {
khenaidoo442e7c72020-03-10 16:13:48 -0400204 originalLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
khenaidoo6e55d9e2019-12-12 18:26:26 -0500205 assert.NotNil(t, originalLogicalDevice)
206 var localWG sync.WaitGroup
207
208 // Change the state of the first port to FAILED
209 localWG.Add(1)
210 go func() {
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400211 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500212 assert.Nil(t, err)
213 localWG.Done()
214 }()
215
216 // Change the state of the second port to TESTING
217 localWG.Add(1)
218 go func() {
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400219 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500220 assert.Nil(t, err)
221 localWG.Done()
222 }()
223
224 // Change the state of the third port to UNKNOWN and then back to ACTIVE
225 localWG.Add(1)
226 go func() {
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400227 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500228 assert.Nil(t, err)
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400229 err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500230 assert.Nil(t, err)
231 localWG.Done()
232 }()
233
234 // Add a meter to the logical device
235 meterMod := &ofp.OfpMeterMod{
236 Command: ofp.OfpMeterModCommand_OFPMC_ADD,
237 Flags: rand.Uint32(),
238 MeterId: rand.Uint32(),
239 Bands: []*ofp.OfpMeterBandHeader{
240 {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
241 Rate: rand.Uint32(),
242 BurstSize: rand.Uint32(),
243 Data: nil,
244 },
245 },
246 }
247 localWG.Add(1)
248 go func() {
npujar467fe752020-01-16 20:17:45 +0530249 err := ldAgent.meterAdd(context.Background(), meterMod)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500250 assert.Nil(t, err)
251 localWG.Done()
252 }()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500253 // wait for go routines to be done
254 localWG.Wait()
Mahir Gunyeladdb66a2020-04-29 18:08:50 -0700255 meterEntry := fu.MeterEntryFromMeterMod(meterMod)
256
Kent Hagerman433a31a2020-05-20 19:04:48 -0400257 meterHandle, have := ldAgent.meterLoader.Lock(meterMod.MeterId)
258 assert.Equal(t, have, true)
259 if have {
260 assert.True(t, proto.Equal(meterEntry, meterHandle.GetReadOnly()))
261 meterHandle.Unlock()
262 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500263
264 expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice)
265 expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
266 expectedChange.Ports[0].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
267 expectedChange.Ports[1].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
268 expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
269 expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
270 expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400271
272 updatedLogicalDevicePorts := ldAgent.listLogicalDevicePorts()
273 for _, p := range expectedChange.Ports {
274 assert.True(t, proto.Equal(p, updatedLogicalDevicePorts[p.DevicePortNo]))
275 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500276 globalWG.Done()
277}
278
279func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
280 lda := newLDATest()
281 assert.NotNil(t, lda)
282 defer lda.stopAll()
283
284 // Start the Core
285 lda.startCore(false)
286
287 var wg sync.WaitGroup
khenaidoo442e7c72020-03-10 16:13:48 -0400288 numConCurrentLogicalDeviceAgents := 3
khenaidoo6e55d9e2019-12-12 18:26:26 -0500289 for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
290 wg.Add(1)
291 a := lda.createLogicalDeviceAgent(t)
292 go lda.updateLogicalDeviceConcurrently(t, a, &wg)
293 }
294
295 wg.Wait()
296}