[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index a6d8186..c2d45de 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -18,44 +18,108 @@
 
 import (
 	"context"
-	"github.com/golang/protobuf/ptypes"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"errors"
 	"sync"
 	"time"
+
+	"github.com/golang/protobuf/ptypes/empty"
+	vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/adapter_services"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+	"google.golang.org/grpc"
 )
 
 // agent represents adapter agent
 type agent struct {
-	adapter *voltha.Adapter
-	lock    sync.RWMutex
+	adapter            *voltha.Adapter
+	lock               sync.RWMutex
+	adapterAPIEndPoint string
+	vClient            *vgrpc.Client
+	adapterLock        sync.RWMutex
+	onAdapterRestart   vgrpc.RestartedHandler
+	liveProbeInterval  time.Duration
 }
 
-func newAdapterAgent(adapter *voltha.Adapter) *agent {
+func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+	svc := adapter_services.NewAdapterServiceClient(conn)
+	if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
+		logger.Debugw(ctx, "connection-not-ready", log.Fields{"error": err, "health": h})
+		return nil
+	}
+	return svc
+}
+
+func newAdapterAgent(adapter *voltha.Adapter, onAdapterRestart vgrpc.RestartedHandler, liveProbeInterval time.Duration) *agent {
 	return &agent{
-		adapter: adapter,
+		adapter:            adapter,
+		onAdapterRestart:   onAdapterRestart,
+		adapterAPIEndPoint: adapter.Endpoint,
+		liveProbeInterval:  liveProbeInterval,
+	}
+}
+
+func (aa *agent) start(ctx context.Context) error {
+	// Establish grpc connection to Core
+	var err error
+	if aa.vClient, err = vgrpc.NewClient(aa.adapterAPIEndPoint,
+		aa.onAdapterRestart,
+		vgrpc.ActivityCheck(true)); err != nil {
+		return err
+	}
+
+	// Add a liveness communication update
+	aa.vClient.SubscribeForLiveness(aa.updateCommunicationTime)
+
+	go aa.vClient.Start(ctx, setAndTestAdapterServiceHandler)
+	return nil
+}
+
+func (aa *agent) stop(ctx context.Context) {
+	// Close the client
+	if aa.vClient != nil {
+		aa.vClient.Stop(ctx)
 	}
 }
 
 func (aa *agent) getAdapter(ctx context.Context) *voltha.Adapter {
-	aa.lock.RLock()
-	defer aa.lock.RUnlock()
-	logger.Debugw(ctx, "getAdapter", log.Fields{"adapter": aa.adapter})
+	aa.adapterLock.RLock()
+	defer aa.adapterLock.RUnlock()
 	return aa.adapter
 }
 
+func (aa *agent) getClient() (adapter_services.AdapterServiceClient, error) {
+	client, err := aa.vClient.GetClient()
+	if err != nil {
+		return nil, err
+	}
+	c, ok := client.(adapter_services.AdapterServiceClient)
+	if ok {
+		return c, nil
+	}
+	return nil, errors.New("invalid client returned")
+}
+
+func (aa *agent) resetConnection(ctx context.Context) {
+	if aa.vClient != nil {
+		aa.vClient.Reset(ctx)
+	}
+}
+
 // updateCommunicationTime updates the message to the specified time.
 // No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
 func (aa *agent) updateCommunicationTime(new time.Time) {
 	// only update if new time is not in the future, and either the old time is invalid or new time > old time
 	aa.lock.Lock()
 	defer aa.lock.Unlock()
-	if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
-		timestamp, err := ptypes.TimestampProto(new)
-		if err != nil {
-			return // if the new time cannot be encoded, just ignore it
-		}
-
-		aa.adapter.LastCommunication = timestamp
+	timestamp := time.Unix(aa.adapter.LastCommunication, 0)
+	if !new.After(time.Now()) && new.After(timestamp) {
+		timestamp = new
+		aa.adapter.LastCommunication = timestamp.Unix()
 	}
 }
+
+func (aa *agent) IsConnectionUp() bool {
+	_, err := aa.getClient()
+	return err == nil
+}