[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 6c8b186..b0ff711 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -20,58 +20,131 @@
"context"
"errors"
"fmt"
- "github.com/opencord/voltha-protos/v4/go/extension"
+ "strconv"
"strings"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ "github.com/opencord/voltha-protos/v5/go/extension"
+ "github.com/phayes/freeport"
+
"github.com/gogo/protobuf/proto"
- "github.com/opencord/voltha-lib-go/v5/pkg/adapters"
- "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
- com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
- of "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ com "github.com/opencord/voltha-lib-go/v7/pkg/adapters/common"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+ of "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
-const (
- numONUPerOLT = 4
- startingUNIPortNo = 100
-)
-
-// static implementation check
-var _ adapters.IAdapter = &OLTAdapter{}
-
// OLTAdapter represent OLT adapter
type OLTAdapter struct {
*Adapter
+ ChildDeviceType string
+ childVendor string
+ grpcServer *vgrpc.GrpcServer
}
// NewOLTAdapter - creates OLT adapter instance
-func NewOLTAdapter(ctx context.Context, cp adapterif.CoreProxy) *OLTAdapter {
- return &OLTAdapter{
- Adapter: NewAdapter(cp),
+func NewOLTAdapter(ctx context.Context, coreEndpoint string, deviceType string, vendor string, childDeviceType, childVendor string) *OLTAdapter {
+ // Get an available port
+ grpcPort, err := freeport.GetFreePort()
+ if err != nil {
+ logger.Fatalw(ctx, "no-free-port", log.Fields{"error": err})
}
+
+ // start gRPC handler
+ listeningAddress := fmt.Sprintf("127.0.0.1:%s", strconv.Itoa(grpcPort))
+ oltAdapter := &OLTAdapter{Adapter: NewAdapter(listeningAddress, coreEndpoint, deviceType, vendor),
+ ChildDeviceType: childDeviceType, childVendor: childVendor}
+
+ oltAdapter.start(ctx)
+ return oltAdapter
+}
+
+func (oltA *OLTAdapter) oltRestarted(ctx context.Context, endPoint string) error {
+ logger.Errorw(ctx, "remote-restarted", log.Fields{"endpoint": endPoint})
+ return nil
+}
+
+func (oltA *OLTAdapter) start(ctx context.Context) {
+
+ // Set up the probe service
+ oltA.Probe = &probe.Probe{}
+ probePort, err := freeport.GetFreePort()
+ if err != nil {
+ logger.Fatal(ctx, "Cannot get a freeport for probePort")
+ }
+ probeAddress := "127.0.0.1:" + strconv.Itoa(probePort)
+ go oltA.Probe.ListenAndServe(ctx, probeAddress)
+
+ probeCtx := context.WithValue(ctx, probe.ProbeContextKey, oltA.Probe)
+
+ oltA.Probe.RegisterService(ctx, "olt-grpc-service", oltA.coreEnpoint)
+
+ oltA.grpcServer = vgrpc.NewGrpcServer(oltA.serviceEndpoint, nil, false, nil)
+
+ logger.Debugw(ctx, "OLTAdapter-address", log.Fields{"address": oltA.serviceEndpoint})
+
+ go oltA.startGRPCService(ctx, oltA.grpcServer, oltA, "olt-grpc-service")
+
+ // Establish grpc connection to Core
+ if oltA.coreClient, err = vgrpc.NewClient(oltA.coreEnpoint,
+ oltA.oltRestarted,
+ vgrpc.ActivityCheck(true)); err != nil {
+ logger.Fatal(ctx, "grpc-client-not-created")
+ }
+
+ go oltA.coreClient.Start(probeCtx, setAndTestCoreServiceHandler)
+
+ logger.Debugw(ctx, "OLTAdapter-started", log.Fields{"grpc-address": oltA.serviceEndpoint})
+
+}
+
+// Stop brings down core services
+func (oltA *OLTAdapter) StopGrpcClient() {
+ // Stop the grpc clients
+ oltA.coreClient.Stop(context.Background())
+}
+
+// Stop brings down core services
+func (oltA *OLTAdapter) Stop() {
+
+ // Stop the grpc
+ if oltA.grpcServer != nil {
+ oltA.grpcServer.Stop()
+ }
+ logger.Debugw(context.Background(), "OLTAdapter-stopped", log.Fields{"grpc-address": oltA.serviceEndpoint})
+
}
// Adopt_device creates new handler for added device
-func (oltA *OLTAdapter) Adopt_device(ctx context.Context, device *voltha.Device) error { // nolint
+func (oltA *OLTAdapter) AdoptDevice(ctx context.Context, device *voltha.Device) (*empty.Empty, error) {
+ logger.Debugw(ctx, "AdoptDevice", log.Fields{"device": device.AdapterEndpoint, "device-type": oltA.DeviceType})
go func() {
d := proto.Clone(device).(*voltha.Device)
d.Root = true
- d.Vendor = "olt_adapter_mock"
+ d.Vendor = oltA.vendor
d.Model = "go-mock"
d.SerialNumber = com.GetRandomSerialNumber()
d.MacAddress = strings.ToUpper(com.GetRandomMacAddress())
oltA.storeDevice(d)
- if res := oltA.coreProxy.DeviceUpdate(context.TODO(), d); res != nil {
- logger.Fatalf(ctx, "deviceUpdate-failed-%s", res)
+ c, err := oltA.GetCoreClient()
+ if err != nil {
+ return
}
+ if _, err := c.DeviceUpdate(context.TODO(), d); err != nil {
+ logger.Fatalf(ctx, "deviceUpdate-failed-%s", err)
+ }
+
capability := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
nniPort := &voltha.Port{
+ DeviceId: device.Id,
PortNo: 2,
Label: fmt.Sprintf("nni-%d", 2),
Type: voltha.Port_ETHERNET_NNI,
- OperStatus: voltha.OperStatus_ACTIVE,
+ OperStatus: common.OperStatus_ACTIVE,
OfpPort: &of.OfpPort{
HwAddr: macAddressToUint32Array("11:22:33:44:55:66"),
Config: 0,
@@ -83,30 +156,30 @@
MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
},
}
- var err error
- if err = oltA.coreProxy.PortCreated(context.TODO(), d.Id, nniPort); err != nil {
+ if _, err = c.PortCreated(context.TODO(), nniPort); err != nil {
logger.Fatalf(ctx, "PortCreated-failed-%s", err)
}
ponPort := &voltha.Port{
+ DeviceId: device.Id,
PortNo: 1,
Label: fmt.Sprintf("pon-%d", 1),
Type: voltha.Port_PON_OLT,
- OperStatus: voltha.OperStatus_ACTIVE,
+ OperStatus: common.OperStatus_ACTIVE,
}
- if err = oltA.coreProxy.PortCreated(context.TODO(), d.Id, ponPort); err != nil {
+ if _, err = c.PortCreated(context.TODO(), ponPort); err != nil {
logger.Fatalf(ctx, "PortCreated-failed-%s", err)
}
- d.ConnectStatus = voltha.ConnectStatus_REACHABLE
- d.OperStatus = voltha.OperStatus_ACTIVE
+ d.ConnectStatus = common.ConnectStatus_REACHABLE
+ d.OperStatus = common.OperStatus_ACTIVE
- if err = oltA.coreProxy.DeviceStateUpdate(context.TODO(), d.Id, d.ConnectStatus, d.OperStatus); err != nil {
- logger.Fatalf(ctx, "Device-state-update-failed-%s", err)
+ if _, err = c.DeviceStateUpdate(context.TODO(), &ic.DeviceStateFilter{DeviceId: d.Id, OperStatus: d.OperStatus, ConnStatus: d.ConnectStatus}); err != nil {
+ logger.Fatalf(ctx, "PortCreated-failed-%s", err)
}
//Get the latest device data from the Core
- if d, err = oltA.coreProxy.GetDevice(context.TODO(), d.Id, d.Id); err != nil {
+ if d, err = c.GetDevice(context.TODO(), &common.ID{Id: d.Id}); err != nil {
logger.Fatalf(ctx, "getting-device-failed-%s", err)
}
@@ -116,21 +189,22 @@
initialUniPortNo := startingUNIPortNo
for i := 0; i < numONUPerOLT; i++ {
go func(seqNo int) {
- if _, err := oltA.coreProxy.ChildDeviceDetected(
- context.TODO(),
- d.Id,
- 1,
- "onu_adapter_mock",
- initialUniPortNo+seqNo,
- "onu_adapter_mock",
- com.GetRandomSerialNumber(),
- int64(seqNo)); err != nil {
- logger.Fatalf(ctx, "failure-sending-child-device-%s", err)
+ if _, err := c.ChildDeviceDetected(context.TODO(),
+ &ic.DeviceDiscovery{
+ ParentId: d.Id,
+ ParentPortNo: 1,
+ ChildDeviceType: oltA.ChildDeviceType,
+ ChannelId: uint32(initialUniPortNo + seqNo),
+ VendorId: oltA.childVendor,
+ SerialNumber: com.GetRandomSerialNumber(),
+ OnuId: uint32(seqNo),
+ }); err != nil {
+ logger.Fatalw(ctx, "failure-sending-child-device", log.Fields{"error": err, "parent-id": d.Id, "child-device-type": oltA.ChildDeviceType})
}
}(i)
}
}()
- return nil
+ return &empty.Empty{}, nil
}
// Single_get_value_request retrieves a single value.
@@ -141,7 +215,7 @@
}
// Get_ofp_device_info returns ofp device info
-func (oltA *OLTAdapter) Get_ofp_device_info(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) { // nolint
+func (oltA *OLTAdapter) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) { // nolint
if d := oltA.getDevice(device.Id); d == nil {
logger.Fatalf(ctx, "device-not-found-%s", device.Id)
}
@@ -162,18 +236,8 @@
}, nil
}
-// GetNumONUPerOLT returns number of ONUs per OLT
-func (oltA *OLTAdapter) GetNumONUPerOLT() int {
- return numONUPerOLT
-}
-
-// Returns the starting UNI port number
-func (oltA *OLTAdapter) GetStartingUNIPortNo() int {
- return startingUNIPortNo
-}
-
// Disable_device disables device
-func (oltA *OLTAdapter) Disable_device(ctx context.Context, device *voltha.Device) error { // nolint
+func (oltA *OLTAdapter) DisableDevice(ctx context.Context, device *voltha.Device) (*empty.Empty, error) { // nolint
go func() {
if d := oltA.getDevice(device.Id); d == nil {
logger.Fatalf(ctx, "device-not-found-%s", device.Id)
@@ -181,15 +245,29 @@
cloned := proto.Clone(device).(*voltha.Device)
// Update the all ports state on that device to disable
- if err := oltA.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, 0, voltha.OperStatus_UNKNOWN); err != nil {
+ c, err := oltA.GetCoreClient()
+ if err != nil {
+ return
+ }
+
+ if _, err := c.PortsStateUpdate(context.TODO(),
+ &ic.PortStateFilter{
+ DeviceId: cloned.Id,
+ PortTypeFilter: 0,
+ OperStatus: common.OperStatus_UNKNOWN,
+ }); err != nil {
logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
}
//Update the device operational state
- cloned.OperStatus = voltha.OperStatus_UNKNOWN
+ cloned.OperStatus = common.OperStatus_UNKNOWN
// The device is still reachable after it has been disabled, so the connection status should not be changed.
- if err := oltA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ if _, err := c.DeviceStateUpdate(context.TODO(), &ic.DeviceStateFilter{
+ DeviceId: cloned.Id,
+ OperStatus: cloned.OperStatus,
+ ConnStatus: cloned.ConnectStatus,
+ }); err != nil {
// Device may already have been deleted in the core
logger.Warnw(ctx, "device-state-update-failed", log.Fields{"device-id": device.Id, "error": err})
return
@@ -198,135 +276,156 @@
oltA.updateDevice(cloned)
// Tell the Core that all child devices have been disabled (by default it's an action already taken by the Core
- if err := oltA.coreProxy.ChildDevicesLost(context.TODO(), cloned.Id); err != nil {
+ if _, err := c.ChildDevicesLost(context.TODO(), &common.ID{Id: cloned.Id}); err != nil {
// Device may already have been deleted in the core
logger.Warnw(ctx, "lost-notif-of-child-devices-failed", log.Fields{"device-id": device.Id, "error": err})
}
}()
- return nil
+ return &empty.Empty{}, nil
}
// Reenable_device reenables device
-func (oltA *OLTAdapter) Reenable_device(ctx context.Context, device *voltha.Device) error { // nolint
+func (oltA *OLTAdapter) ReEnableDevice(ctx context.Context, device *voltha.Device) (*empty.Empty, error) { // nolint
go func() {
if d := oltA.getDevice(device.Id); d == nil {
logger.Fatalf(ctx, "device-not-found-%s", device.Id)
}
cloned := proto.Clone(device).(*voltha.Device)
+
+ c, err := oltA.GetCoreClient()
+ if err != nil {
+ return
+ }
+
// Update the all ports state on that device to enable
- if err := oltA.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, 0, voltha.OperStatus_ACTIVE); err != nil {
- logger.Fatalf(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
+ if _, err := c.PortsStateUpdate(context.TODO(),
+ &ic.PortStateFilter{
+ DeviceId: cloned.Id,
+ PortTypeFilter: 0,
+ OperStatus: common.OperStatus_ACTIVE,
+ }); err != nil {
+ logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
}
//Update the device state
- cloned.OperStatus = voltha.OperStatus_ACTIVE
+ cloned.OperStatus = common.OperStatus_ACTIVE
- if err := oltA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ if _, err := c.DeviceStateUpdate(context.TODO(), &ic.DeviceStateFilter{
+ DeviceId: cloned.Id,
+ OperStatus: cloned.OperStatus,
+ ConnStatus: cloned.ConnectStatus,
+ }); err != nil {
+ // Device may already have been deleted in the core
logger.Fatalf(ctx, "device-state-update-failed", log.Fields{"device-id": device.Id, "error": err})
+ return
}
// Tell the Core that all child devices have been enabled
- if err := oltA.coreProxy.ChildDevicesDetected(context.TODO(), cloned.Id); err != nil {
+ if _, err := c.ChildDevicesDetected(context.TODO(), &common.ID{Id: cloned.Id}); err != nil {
logger.Fatalf(ctx, "detection-notif-of-child-devices-failed", log.Fields{"device-id": device.Id, "error": err})
}
}()
- return nil
+ return &empty.Empty{}, nil
}
// Enable_port -
-func (oltA *OLTAdapter) Enable_port(ctx context.Context, deviceId string, Port *voltha.Port) error { //nolint
+func (oltA *OLTAdapter) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) { //nolint
go func() {
+ c, err := oltA.GetCoreClient()
+ if err != nil {
+ return
+ }
- if Port.Type == voltha.Port_PON_OLT {
- if err := oltA.coreProxy.PortStateUpdate(context.TODO(), deviceId, voltha.Port_PON_OLT, Port.PortNo, voltha.OperStatus_ACTIVE); err != nil {
- logger.Fatalf(ctx, "updating-ports-failed", log.Fields{"device-id": deviceId, "error": err})
+ if port.Type == voltha.Port_PON_OLT {
+ if _, err := c.PortStateUpdate(context.TODO(),
+ &ic.PortState{
+ DeviceId: port.DeviceId,
+ PortType: voltha.Port_ETHERNET_NNI,
+ PortNo: port.PortNo,
+ OperStatus: common.OperStatus_ACTIVE,
+ }); err != nil {
+ logger.Fatalf(ctx, "updating-ports-failed", log.Fields{"device-id": port.DeviceId, "error": err})
}
}
}()
- return nil
+ return &empty.Empty{}, nil
}
// Disable_port -
-func (oltA *OLTAdapter) Disable_port(ctx context.Context, deviceId string, Port *voltha.Port) error { //nolint
+func (oltA *OLTAdapter) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) { //nolint
go func() {
-
- if Port.Type == voltha.Port_PON_OLT {
- if err := oltA.coreProxy.PortStateUpdate(context.TODO(), deviceId, voltha.Port_PON_OLT, Port.PortNo, voltha.OperStatus_DISCOVERED); err != nil {
+ c, err := oltA.GetCoreClient()
+ if err != nil {
+ return
+ }
+ if port.Type == voltha.Port_PON_OLT {
+ if _, err := c.PortStateUpdate(context.TODO(),
+ &ic.PortState{
+ DeviceId: port.DeviceId,
+ PortType: voltha.Port_PON_OLT,
+ PortNo: port.PortNo,
+ OperStatus: common.OperStatus_DISCOVERED,
+ }); err != nil {
// Corresponding device may have been deleted
- logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": deviceId, "error": err})
+ logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": port.DeviceId, "error": err})
}
}
}()
- return nil
-}
-
-// Child_device_lost deletes ONU and its references
-func (oltA *OLTAdapter) Child_device_lost(ctx context.Context, childDevice *voltha.Device) error { // nolint
- return nil
+ return &empty.Empty{}, nil
}
// Reboot_device -
-func (oltA *OLTAdapter) Reboot_device(ctx context.Context, device *voltha.Device) error { // nolint
+func (oltA *OLTAdapter) RebootDevice(ctx context.Context, device *voltha.Device) (*empty.Empty, error) { // nolint
logger.Infow(ctx, "reboot-device", log.Fields{"device-id": device.Id})
go func() {
- if err := oltA.coreProxy.DeviceStateUpdate(context.TODO(), device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
- logger.Fatalf(ctx, "device-state-update-failed", log.Fields{"device-id": device.Id, "error": err})
+ c, err := oltA.GetCoreClient()
+ if err != nil {
+ return
}
- if err := oltA.coreProxy.PortsStateUpdate(context.TODO(), device.Id, 0, voltha.OperStatus_UNKNOWN); err != nil {
- // Not an error as the previous command will start the process of clearing the OLT
- logger.Infow(ctx, "port-update-failed", log.Fields{"device-id": device.Id, "error": err})
+
+ if _, err := c.DeviceStateUpdate(context.TODO(), &ic.DeviceStateFilter{
+ DeviceId: device.Id,
+ OperStatus: common.OperStatus_UNKNOWN,
+ ConnStatus: common.ConnectStatus_UNREACHABLE,
+ }); err != nil {
+ logger.Fatalf(ctx, "device-state-update-failed", log.Fields{"device-id": device.Id, "error": err})
+ return
+ }
+
+ if _, err := c.PortsStateUpdate(context.TODO(),
+ &ic.PortStateFilter{
+ DeviceId: device.Id,
+ PortTypeFilter: 0,
+ OperStatus: common.OperStatus_UNKNOWN,
+ }); err != nil {
+ logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
}
}()
- return nil
+ return &empty.Empty{}, nil
}
// TODO: REMOVE Start_omci_test begins an omci self-test
-func (oltA *OLTAdapter) Start_omci_test(ctx context.Context, device *voltha.Device, request *voltha.OmciTestRequest) (*ic.TestResponse, error) { // nolint
- _ = device
+func (oltA *OLTAdapter) StartOmciTest(ctx context.Context, test *ic.OMCITest) (*voltha.TestResponse, error) { // nolint
return nil, errors.New("start-omci-test-not-implemented")
}
-func (oltA *OLTAdapter) Get_ext_value(ctx context.Context, deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { // nolint
- _ = deviceId
- _ = device
- _ = valueflag
- return nil, errors.New("get-ext-value-not-implemented")
-}
+// Helper for test only
+func (oltA *OLTAdapter) SetDeviceActive(deviceID string) {
+ c, err := oltA.GetCoreClient()
+ if err != nil {
+ return
+ }
-func (oltA *OLTAdapter) Download_onu_image(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) { //nolint
- _ = request
- return nil, errors.New("download-onu-image-not-implemented")
-}
+ if _, err := c.DeviceStateUpdate(context.TODO(), &ic.DeviceStateFilter{
+ DeviceId: deviceID,
+ OperStatus: common.OperStatus_ACTIVE,
+ ConnStatus: common.ConnectStatus_REACHABLE,
+ }); err != nil {
+ logger.Warnw(context.Background(), "device-state-update-failed", log.Fields{"device-id": deviceID, "error": err})
+ return
+ }
-func (oltA *OLTAdapter) Get_onu_image_status(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
- _ = in
- return nil, errors.New("get-onu-image-not-implemented")
-}
-
-func (oltA *OLTAdapter) Abort_onu_image_upgrade(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
- _ = in
- return nil, errors.New("abort-onu-image-upgrade-not-implemented")
-}
-
-func (oltA *OLTAdapter) Get_onu_images(ctx context.Context, deviceID string) (*voltha.OnuImages, error) { //nolint
- _ = deviceID
- return nil, errors.New("get-onu-images-not-implemented")
-}
-
-func (oltA *OLTAdapter) Activate_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
- _ = in
- return nil, errors.New("activate-onu-image-not-implemented")
-}
-
-func (oltA *OLTAdapter) Commit_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
- _ = in
- return nil, errors.New("commit-onu-image-not-implemented")
-}
-
-func (oltA *OLTAdapter) Process_tech_profile_instance_request(ctx context.Context, in *ic.InterAdapterTechProfileInstanceRequestMessage) *ic.InterAdapterTechProfileDownloadMessage { //nolint
- _ = in
- return nil
}