[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index 7ce6324..8fb4744 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -18,51 +18,121 @@
 
 import (
 	"context"
-	"errors"
 	"fmt"
-	"github.com/opencord/voltha-protos/v4/go/extension"
+	"strconv"
 	"strings"
 
+	"github.com/golang/protobuf/ptypes/empty"
+	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+	"github.com/opencord/voltha-lib-go/v7/pkg/probe"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	"github.com/opencord/voltha-protos/v5/go/extension"
+	"github.com/phayes/freeport"
+
 	"github.com/gogo/protobuf/proto"
-	"github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
-	com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
-	of "github.com/opencord/voltha-protos/v4/go/openflow_13"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	com "github.com/opencord/voltha-lib-go/v7/pkg/adapters/common"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+	of "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
 // ONUAdapter represent ONU adapter attributes
 type ONUAdapter struct {
 	*Adapter
+	grpcServer *vgrpc.GrpcServer
 }
 
 // NewONUAdapter creates ONU adapter
-func NewONUAdapter(ctx context.Context, cp adapterif.CoreProxy) *ONUAdapter {
-	return &ONUAdapter{
-		Adapter: NewAdapter(cp),
+func NewONUAdapter(ctx context.Context, coreEndpoint string, deviceType string, vendor string) *ONUAdapter {
+	// Get an available  port
+	grpcPort, err := freeport.GetFreePort()
+	if err != nil {
+		logger.Fatalw(ctx, "no-free-port", log.Fields{"error": err})
 	}
+	listeningAddress := fmt.Sprintf("127.0.0.1:%s", strconv.Itoa(grpcPort))
+	onuAdapter := &ONUAdapter{Adapter: NewAdapter(listeningAddress, coreEndpoint, deviceType, vendor)}
+
+	onuAdapter.start(ctx)
+	return onuAdapter
+}
+
+func (onuA *ONUAdapter) onuRestarted(ctx context.Context, endPoint string) error {
+	logger.Errorw(ctx, "remote-restarted", log.Fields{"endpoint": endPoint})
+	return nil
+}
+
+func (onuA *ONUAdapter) start(ctx context.Context) {
+
+	// Set up the probe service
+	onuA.Probe = &probe.Probe{}
+	probePort, err := freeport.GetFreePort()
+	if err != nil {
+		logger.Fatal(ctx, "Cannot get a freeport for probePort")
+	}
+	probeAddress := "127.0.0.1:" + strconv.Itoa(probePort)
+	go onuA.Probe.ListenAndServe(ctx, probeAddress)
+
+	probeCtx := context.WithValue(ctx, probe.ProbeContextKey, onuA.Probe)
+
+	onuA.Probe.RegisterService(ctx, "onu-grpc-service", onuA.coreEnpoint)
+
+	// start gRPC handler
+	onuA.grpcServer = vgrpc.NewGrpcServer(onuA.serviceEndpoint, nil, false, nil)
+
+	logger.Debugw(ctx, "ONUAdapter-address", log.Fields{"address": onuA.serviceEndpoint})
+
+	go onuA.startGRPCService(ctx, onuA.grpcServer, onuA, "onu-grpc-service")
+
+	// Establish grpc connection to Core
+	if onuA.coreClient, err = vgrpc.NewClient(onuA.coreEnpoint,
+		onuA.onuRestarted,
+		vgrpc.ActivityCheck(true)); err != nil {
+		logger.Fatal(ctx, "grpc-client-not-created")
+	}
+	go onuA.coreClient.Start(probeCtx, setAndTestCoreServiceHandler)
+
+	logger.Debugw(ctx, "ONUAdapter-started", log.Fields{"grpc-address": onuA.serviceEndpoint})
+}
+
+// Stop brings down core services
+func (onuA *ONUAdapter) StopGrpcClient() {
+	// Stop the grpc clients
+	onuA.coreClient.Stop(context.Background())
+}
+
+func (onuA *ONUAdapter) Stop() {
+	if onuA.grpcServer != nil {
+		onuA.grpcServer.Stop()
+	}
+	logger.Debugw(context.Background(), "ONUAdapter-stopped", log.Fields{"grpc-address": onuA.serviceEndpoint})
 }
 
 // Adopt_device creates new handler for added device
-func (onuA *ONUAdapter) Adopt_device(ctx context.Context, device *voltha.Device) error { // nolint
+func (onuA *ONUAdapter) AdoptDevice(ctx context.Context, device *voltha.Device) (*empty.Empty, error) {
+	logger.Debugw(ctx, "AdoptDevice", log.Fields{"device": device.AdapterEndpoint, "device-type": onuA.DeviceType})
 	go func() {
 		d := proto.Clone(device).(*voltha.Device)
 		d.Root = false
-		d.Vendor = "onu_adapter_mock"
+		d.Vendor = onuA.vendor
 		d.Model = "go-mock"
 		d.SerialNumber = com.GetRandomSerialNumber()
 		d.MacAddress = strings.ToUpper(com.GetRandomMacAddress())
 		onuA.storeDevice(d)
-		if res := onuA.coreProxy.DeviceUpdate(context.TODO(), d); res != nil {
-			logger.Fatalf(ctx, "deviceUpdate-failed-%s", res)
+
+		c, err := onuA.GetCoreClient()
+		if err != nil {
+			return
+		}
+		if _, err := c.DeviceUpdate(context.TODO(), d); err != nil {
+			logger.Fatalf(ctx, "deviceUpdate-failed-%s", err)
 		}
 
-		d.ConnectStatus = voltha.ConnectStatus_REACHABLE
-		d.OperStatus = voltha.OperStatus_DISCOVERED
+		d.ConnectStatus = common.ConnectStatus_REACHABLE
+		d.OperStatus = common.OperStatus_DISCOVERED
 
-		if err := onuA.coreProxy.DeviceStateUpdate(context.TODO(), d.Id, d.ConnectStatus, d.OperStatus); err != nil {
-			logger.Fatalf(ctx, "device-state-update-failed-%s", err)
+		if _, err = c.DeviceStateUpdate(context.TODO(), &ic.DeviceStateFilter{DeviceId: d.Id, OperStatus: d.OperStatus, ConnStatus: d.ConnectStatus}); err != nil {
+			logger.Fatalf(ctx, "PortCreated-failed-%s", err)
 		}
 
 		uniPortNo := uint32(2)
@@ -74,10 +144,11 @@
 
 		capability := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
 		uniPort := &voltha.Port{
+			DeviceId:   d.Id,
 			PortNo:     uniPortNo,
 			Label:      fmt.Sprintf("uni-%d", uniPortNo),
 			Type:       voltha.Port_ETHERNET_UNI,
-			OperStatus: voltha.OperStatus_ACTIVE,
+			OperStatus: common.OperStatus_ACTIVE,
 			OfpPort: &of.OfpPort{
 				HwAddr:     macAddressToUint32Array("12:12:12:12:12:12"),
 				Config:     0,
@@ -90,8 +161,7 @@
 			},
 		}
 
-		var err error
-		if err = onuA.coreProxy.PortCreated(context.TODO(), d.Id, uniPort); err != nil {
+		if _, err = c.PortCreated(context.TODO(), uniPort); err != nil {
 			logger.Fatalf(ctx, "PortCreated-failed-%s", err)
 		}
 
@@ -101,31 +171,34 @@
 		}
 
 		ponPort := &voltha.Port{
+			DeviceId:   d.Id,
 			PortNo:     ponPortNo,
 			Label:      fmt.Sprintf("pon-%d", ponPortNo),
 			Type:       voltha.Port_PON_ONU,
-			OperStatus: voltha.OperStatus_ACTIVE,
+			OperStatus: common.OperStatus_ACTIVE,
 			Peers: []*voltha.Port_PeerPort{{DeviceId: d.ParentId, // Peer device  is OLT
 				PortNo: device.ParentPortNo}}, // Peer port is parent's port number
 		}
-		if err = onuA.coreProxy.PortCreated(context.TODO(), d.Id, ponPort); err != nil {
+
+		if _, err = c.PortCreated(context.TODO(), ponPort); err != nil {
 			logger.Fatalf(ctx, "PortCreated-failed-%s", err)
 		}
 
-		d.ConnectStatus = voltha.ConnectStatus_REACHABLE
-		d.OperStatus = voltha.OperStatus_ACTIVE
+		d.ConnectStatus = common.ConnectStatus_REACHABLE
+		d.OperStatus = common.OperStatus_ACTIVE
 
-		if err = onuA.coreProxy.DeviceStateUpdate(context.TODO(), d.Id, d.ConnectStatus, d.OperStatus); err != nil {
-			logger.Fatalf(ctx, "device-state-update-failed-%s", err)
+		if _, err = c.DeviceStateUpdate(context.TODO(), &ic.DeviceStateFilter{DeviceId: d.Id, OperStatus: d.OperStatus, ConnStatus: d.ConnectStatus}); err != nil {
+			logger.Fatalf(ctx, "PortCreated-failed-%s", err)
 		}
+
 		//Get the latest device data from the Core
-		if d, err = onuA.coreProxy.GetDevice(context.TODO(), d.Id, d.Id); err != nil {
+		if d, err = c.GetDevice(context.TODO(), &common.ID{Id: d.Id}); err != nil {
 			logger.Fatalf(ctx, "getting-device-failed-%s", err)
 		}
 
 		onuA.updateDevice(d)
 	}()
-	return nil
+	return &empty.Empty{}, nil
 }
 
 // Single_get_value_request retrieves a single value.
@@ -136,102 +209,89 @@
 }
 
 // Disable_device disables device
-func (onuA *ONUAdapter) Disable_device(ctx context.Context, device *voltha.Device) error { // nolint
+func (onuA *ONUAdapter) DisableDevice(ctx context.Context, device *voltha.Device) (*empty.Empty, error) { // nolint
 	go func() {
 		if d := onuA.getDevice(device.Id); d == nil {
 			logger.Fatalf(ctx, "device-not-found-%s", device.Id)
 		}
+
 		cloned := proto.Clone(device).(*voltha.Device)
 		// Update the all ports state on that device to disable
-		if err := onuA.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, 0, voltha.OperStatus_UNKNOWN); err != nil {
-			// Device may also have been deleted in the Core
-			logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
+		c, err := onuA.GetCoreClient()
+		if err != nil {
 			return
 		}
-		//Update the device state
-		cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
-		cloned.OperStatus = voltha.OperStatus_UNKNOWN
 
-		if err := onuA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+		if _, err := c.PortsStateUpdate(context.TODO(),
+			&ic.PortStateFilter{
+				DeviceId:       cloned.Id,
+				PortTypeFilter: 0,
+				OperStatus:     common.OperStatus_UNKNOWN,
+			}); err != nil {
+			logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
+		}
+
+		//Update the device operational state
+		cloned.ConnectStatus = common.ConnectStatus_UNREACHABLE
+		cloned.OperStatus = common.OperStatus_UNKNOWN
+
+		if _, err := c.DeviceStateUpdate(context.TODO(), &ic.DeviceStateFilter{
+			DeviceId:   cloned.Id,
+			OperStatus: cloned.OperStatus,
+			ConnStatus: cloned.ConnectStatus,
+		}); err != nil {
+			// Device may already have been deleted in the core
 			logger.Warnw(ctx, "device-state-update-failed", log.Fields{"device-id": device.Id, "error": err})
 			return
 		}
+
 		onuA.updateDevice(cloned)
+
 	}()
-	return nil
+	return &empty.Empty{}, nil
 }
 
-// Reenable_device reenables device
-func (onuA *ONUAdapter) Reenable_device(ctx context.Context, device *voltha.Device) error { // nolint
+func (onuA *ONUAdapter) ReEnableDevice(ctx context.Context, device *voltha.Device) (*empty.Empty, error) { // nolint
 	go func() {
 		if d := onuA.getDevice(device.Id); d == nil {
 			logger.Fatalf(ctx, "device-not-found-%s", device.Id)
 		}
 
 		cloned := proto.Clone(device).(*voltha.Device)
+
+		c, err := onuA.GetCoreClient()
+		if err != nil {
+			return
+		}
+
 		// Update the all ports state on that device to enable
-		if err := onuA.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, 0, voltha.OperStatus_ACTIVE); err != nil {
-			logger.Fatalf(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
+		if _, err := c.PortsStateUpdate(context.TODO(),
+			&ic.PortStateFilter{
+				DeviceId:       cloned.Id,
+				PortTypeFilter: 0,
+				OperStatus:     common.OperStatus_ACTIVE,
+			}); err != nil {
+			logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
 		}
 
 		//Update the device state
-		cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
-		cloned.OperStatus = voltha.OperStatus_ACTIVE
+		cloned.ConnectStatus = common.ConnectStatus_REACHABLE
+		cloned.OperStatus = common.OperStatus_ACTIVE
 
-		if err := onuA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+		if _, err := c.DeviceStateUpdate(context.TODO(), &ic.DeviceStateFilter{
+			DeviceId:   cloned.Id,
+			OperStatus: cloned.OperStatus,
+			ConnStatus: cloned.ConnectStatus,
+		}); err != nil {
+			// Device may already have been deleted in the core
 			logger.Fatalf(ctx, "device-state-update-failed", log.Fields{"device-id": device.Id, "error": err})
+			return
 		}
-
 		onuA.updateDevice(cloned)
 	}()
-	return nil
+	return &empty.Empty{}, nil
 }
 
-// Start_omci_test begins an omci self-test
-func (onuA *ONUAdapter) Start_omci_test(ctx context.Context, device *voltha.Device, request *voltha.OmciTestRequest) (*ic.TestResponse, error) { // nolint
-	_ = device
-	return &ic.TestResponse{Result: ic.TestResponse_SUCCESS}, nil
-}
-
-func (onuA *ONUAdapter) Get_ext_value(ctx context.Context, deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { // nolint
-	_ = deviceId
-	_ = device
-	_ = valueflag
-	return nil, errors.New("get-ext-value-not-implemented")
-}
-
-func (onuA *ONUAdapter) Download_onu_image(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) { //nolint
-	logger.Infof(ctx, "Download_onu_image")
-	_ = request
-	return nil, errors.New("download-onu-image-not-implemented")
-}
-
-func (onuA *ONUAdapter) Get_onu_image_status(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
-	_ = in
-	return nil, errors.New("get-onu-image-not-implemented")
-}
-
-func (onuA *ONUAdapter) Abort_onu_image_upgrade(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
-	_ = in
-	return nil, errors.New("abort-onu-image-upgrade-not-implemented")
-}
-
-func (onuA *ONUAdapter) Get_onu_images(ctx context.Context, deviceID string) (*voltha.OnuImages, error) { //nolint
-	_ = deviceID
-	return nil, errors.New("get-onu-images-not-implemented")
-}
-
-func (onuA *ONUAdapter) Activate_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
-	_ = in
-	return nil, errors.New("activate-onu-image-not-implemented")
-}
-
-func (onuA *ONUAdapter) Commit_onu_image(ctx context.Context, in *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) { //nolint
-	_ = in
-	return nil, errors.New("commit-onu-image-not-implemented")
-}
-
-func (onuA *ONUAdapter) Process_tech_profile_instance_request(ctx context.Context, in *ic.InterAdapterTechProfileInstanceRequestMessage) *ic.InterAdapterTechProfileDownloadMessage { //nolint
-	_ = in
-	return nil
+func (onuA *ONUAdapter) StartOmciTest(ctx context.Context, _ *ic.OMCITest) (*voltha.TestResponse, error) { // nolint
+	return &voltha.TestResponse{Result: voltha.TestResponse_SUCCESS}, nil
 }