[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index e3716a7..0577c1e 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -18,7 +18,6 @@
import (
"context"
- "fmt"
"math/rand"
"sort"
"strconv"
@@ -27,21 +26,23 @@
"testing"
"time"
+ ver "github.com/opencord/voltha-lib-go/v7/pkg/version"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-go/rw_core/core/adapter"
- cm "github.com/opencord/voltha-go/rw_core/mocks"
tst "github.com/opencord/voltha-go/rw_core/test"
- com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
- "github.com/opencord/voltha-lib-go/v5/pkg/db"
- "github.com/opencord/voltha-lib-go/v5/pkg/events"
- "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- mock_etcd "github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd"
- mock_kafka "github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ com "github.com/opencord/voltha-lib-go/v7/pkg/adapters/common"
+ "github.com/opencord/voltha-lib-go/v7/pkg/db"
+ "github.com/opencord/voltha-lib-go/v7/pkg/events"
+ "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ mock_etcd "github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
+ mock_kafka "github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
)
@@ -51,17 +52,13 @@
deviceMgr *Manager
logicalDeviceMgr *LogicalManager
adapterMgr *adapter.Manager
- kmp kafka.InterContainerProxy
kClient kafka.Client
kEventClient kafka.Client
kvClientPort int
- oltAdapter *cm.OLTAdapter
- onuAdapter *cm.ONUAdapter
oltAdapterName string
onuAdapterName string
coreInstanceID string
- defaultTimeout time.Duration
- maxTimeout time.Duration
+ internalTimeout time.Duration
device *voltha.Device
devicePorts map[uint32]*voltha.Port
done chan int
@@ -78,24 +75,22 @@
// Create the kafka client
test.kClient = mock_kafka.NewKafkaClient()
test.kEventClient = mock_kafka.NewKafkaClient()
- test.oltAdapterName = "olt_adapter_mock"
- test.onuAdapterName = "onu_adapter_mock"
+ test.oltAdapterName = "olt-mock-adapter"
+ test.onuAdapterName = "onu-mock-adapter"
test.coreInstanceID = "rw-da-test"
- test.defaultTimeout = 5 * time.Second
- test.maxTimeout = 20 * time.Second
+ test.internalTimeout = 5 * time.Second
test.done = make(chan int)
parentID := com.GetRandomString(10)
test.device = &voltha.Device{
- Type: "onu_adapter_mock",
+ Type: "onu-mock-device-type",
ParentId: parentID,
ParentPortNo: 1,
- VendorId: "onu_adapter_mock",
- Adapter: "onu_adapter_mock",
+ VendorId: "onu-mock-vendor",
Vlan: 100,
Address: nil,
ProxyAddress: &voltha.Device_ProxyAddress{
DeviceId: parentID,
- DeviceType: "olt_adapter_mock",
+ DeviceType: "olt-mock-device-type",
ChannelId: 100,
ChannelGroupId: 0,
ChannelTermination: "",
@@ -119,15 +114,14 @@
func (dat *DATest) startCore(ctx context.Context) {
cfg := &config.RWCoreFlags{}
cfg.ParseCommandArguments([]string{})
- cfg.CoreTopic = "rw_core"
cfg.EventTopic = "voltha.events"
- cfg.DefaultRequestTimeout = dat.defaultTimeout
+ cfg.InternalTimeout = dat.internalTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
grpcPort, err := freeport.GetFreePort()
if err != nil {
logger.Fatal(ctx, "Cannot get a freeport for grpc")
}
- cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
+ cfg.GrpcNBIAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
client := tst.SetupKVClient(ctx, cfg, dat.coreInstanceID)
backend := &db.Backend{
Client: client,
@@ -135,34 +129,20 @@
Address: cfg.KVStoreAddress,
Timeout: cfg.KVStoreTimeout,
LivenessChannelInterval: cfg.LiveProbeInterval / 2}
- dat.kmp = kafka.NewInterContainerProxy(
- kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
- kafka.MsgClient(dat.kClient),
- kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
- endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
- dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
+ dat.adapterMgr = adapter.NewAdapterManager(proxy, dat.coreInstanceID, backend, 5)
eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
- dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg, dat.coreInstanceID, eventProxy)
- dat.adapterMgr.Start(context.Background())
- if err = dat.kmp.Start(ctx); err != nil {
- logger.Fatal(ctx, "Cannot start InterContainerProxy")
- }
-
- if err := dat.kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: cfg.CoreTopic}, kafka.OffsetNewest); err != nil {
- logger.Fatalf(ctx, "Cannot add default request handler: %s", err)
- }
-
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, cfg, dat.coreInstanceID, eventProxy)
+ dat.adapterMgr.Start(context.Background(), "agent-test")
+ dat.registerAdapters(context.Background())
+ log.SetAllLogLevel(log.FatalLevel)
}
func (dat *DATest) stopAll(ctx context.Context) {
if dat.kClient != nil {
dat.kClient.Stop(ctx)
}
- if dat.kmp != nil {
- dat.kmp.Stop(ctx)
- }
if dat.etcdServer != nil {
tst.StopEmbeddedEtcdServer(ctx, dat.etcdServer)
}
@@ -174,7 +154,7 @@
func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
deviceMgr := dat.deviceMgr
clonedDevice := proto.Clone(dat.device).(*voltha.Device)
- deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.defaultTimeout)
+ deviceAgent := newAgent(clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.internalTimeout, deviceMgr.rpcTimeout)
d, err := deviceAgent.start(context.TODO(), false, clonedDevice)
assert.Nil(t, err)
assert.NotNil(t, d)
@@ -215,19 +195,13 @@
deviceToUpdate.MacAddress = macAddress
deviceToUpdate.Vlan = vlan
deviceToUpdate.Reason = reason
+ deviceToUpdate.OperStatus = voltha.OperStatus_ACTIVE
+ deviceToUpdate.ConnectStatus = voltha.ConnectStatus_REACHABLE
err := da.updateDeviceUsingAdapterData(context.Background(), deviceToUpdate)
assert.Nil(t, err)
localWG.Done()
}()
- // Update the device status routine
- localWG.Add(1)
- go func() {
- err := da.updateDeviceStatus(context.Background(), voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
- assert.Nil(t, err)
- localWG.Done()
- }()
-
// Add a port routine
localWG.Add(1)
go func() {
@@ -253,8 +227,6 @@
updatedDevice, _ := da.getDeviceReadOnly(context.Background())
updatedDevicePorts := da.listDevicePorts()
assert.NotNil(t, updatedDevice)
- fmt.Printf("1 %+v\n", expectedChange)
- fmt.Printf("2 %+v\n", updatedDevice)
assert.True(t, proto.Equal(expectedChange, updatedDevice))
assert.Equal(t, len(originalDevicePorts)+1, len(updatedDevicePorts))
assert.True(t, proto.Equal(updatedDevicePorts[portToAdd.PortNo], portToAdd))
@@ -268,7 +240,7 @@
da := newDATest(ctx)
assert.NotNil(t, da)
defer da.stopAll(ctx)
- log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
+
// Start the Core
da.startCore(ctx)
@@ -279,7 +251,6 @@
a := da.createDeviceAgent(t)
go da.updateDeviceConcurrently(t, a, &wg)
}
-
wg.Wait()
}
}
@@ -292,8 +263,6 @@
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
// Start the Core
da.startCore(ctx)
- da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
-
a := da.createDeviceAgent(t)
err1 := a.requestQueue.WaitForGreenLight(ctx)
assert.Nil(t, err1)
@@ -312,7 +281,6 @@
// Start the Core
da.startCore(ctx)
- da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
a := da.createDeviceAgent(t)
err1 := a.requestQueue.WaitForGreenLight(ctx)
assert.Nil(t, err1)
@@ -380,7 +348,9 @@
{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
}
err := da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ // Expect specific error as adapter communication, for unit tests, are not set
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "flow-failure-device-"))
daFlows := changeToFlowList(da.listDeviceFlows())
assert.True(t, isFlowSliceEqual(newFlows, daFlows))
@@ -399,7 +369,8 @@
}
err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "flow-failure-device-"))
daFlows = changeToFlowList(da.listDeviceFlows())
assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
@@ -420,7 +391,8 @@
}
err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "Aborted"))
daFlows = changeToFlowList(da.listDeviceFlows())
assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
@@ -459,7 +431,8 @@
}
err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "Aborted"))
daFlows = changeToFlowList(da.listDeviceFlows())
assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
//Delete flows with an unexisting one
@@ -474,7 +447,8 @@
}
err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "Aborted"))
daFlows = changeToFlowList(da.listDeviceFlows())
assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
}
@@ -486,7 +460,8 @@
{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
}
err := da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "flow-failure-device-"))
daGroups := changeToGroupList(da.listDeviceGroups())
assert.True(t, isGroupSliceEqual(newGroups, daGroups))
@@ -502,7 +477,8 @@
{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
}
err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "Aborted"))
daGroups = changeToGroupList(da.listDeviceGroups())
assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
@@ -520,7 +496,8 @@
{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
}
err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "Aborted"))
daGroups = changeToGroupList(da.listDeviceGroups())
assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
@@ -536,7 +513,8 @@
{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
}
err = da.updateFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, updtGroups, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "Aborted"))
daGroups = changeToGroupList(da.listDeviceGroups())
assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
@@ -551,7 +529,8 @@
{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
}
err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "Aborted"))
daGroups = changeToGroupList(da.listDeviceGroups())
assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
@@ -565,7 +544,49 @@
{Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
}
err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
- assert.Nil(t, err)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "Aborted"))
daGroups = changeToGroupList(da.listDeviceGroups())
assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
}
+
+// registerAdapters registers the ONU and OLT adapters
+func (dat *DATest) registerAdapters(ctx context.Context) {
+ oltAdapter := &voltha.Adapter{
+ Id: "olt-mock-adapter-1",
+ Vendor: "olt-mock-vendor",
+ Version: ver.VersionInfo.Version,
+ Type: "olt-mock-adapter-type",
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ Endpoint: "mock-olt-endpoint",
+ }
+ types := []*voltha.DeviceType{{Id: "olt-mock-device-type", AdapterType: "olt-mock-adapter-type", AcceptsAddRemoveFlowUpdates: true}}
+ deviceTypes := &voltha.DeviceTypes{Items: types}
+ _, err := dat.adapterMgr.RegisterAdapter(ctx, &ic.AdapterRegistration{
+ Adapter: oltAdapter,
+ DTypes: deviceTypes,
+ })
+ if err != nil {
+ logger.Fatalw(ctx, "olt registration failed", log.Fields{"error": err})
+ }
+
+ onuAdapter := &voltha.Adapter{
+ Id: "onu-mock-adapter-1",
+ Vendor: "onu-mock-vendor",
+ Version: ver.VersionInfo.Version,
+ Type: "onu-mock-adapter-type",
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ Endpoint: "mock-onu-endpoint",
+ }
+ types = []*voltha.DeviceType{{Id: "onu-mock-device-type", AdapterType: "onu-mock-adapter-type", AcceptsAddRemoveFlowUpdates: true}}
+ deviceTypes = &voltha.DeviceTypes{Items: types}
+ _, err = dat.adapterMgr.RegisterAdapter(ctx, &ic.AdapterRegistration{
+ Adapter: onuAdapter,
+ DTypes: deviceTypes,
+ })
+ if err != nil {
+ logger.Fatalw(ctx, "onu registration failed", log.Fields{"error": err})
+ }
+}