[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index a6d8186..c2d45de 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -18,44 +18,108 @@
import (
"context"
- "github.com/golang/protobuf/ptypes"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "errors"
"sync"
"time"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/adapter_services"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/grpc"
)
// agent represents adapter agent
type agent struct {
- adapter *voltha.Adapter
- lock sync.RWMutex
+ adapter *voltha.Adapter
+ lock sync.RWMutex
+ adapterAPIEndPoint string
+ vClient *vgrpc.Client
+ adapterLock sync.RWMutex
+ onAdapterRestart vgrpc.RestartedHandler
+ liveProbeInterval time.Duration
}
-func newAdapterAgent(adapter *voltha.Adapter) *agent {
+func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+ svc := adapter_services.NewAdapterServiceClient(conn)
+ if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
+ logger.Debugw(ctx, "connection-not-ready", log.Fields{"error": err, "health": h})
+ return nil
+ }
+ return svc
+}
+
+func newAdapterAgent(adapter *voltha.Adapter, onAdapterRestart vgrpc.RestartedHandler, liveProbeInterval time.Duration) *agent {
return &agent{
- adapter: adapter,
+ adapter: adapter,
+ onAdapterRestart: onAdapterRestart,
+ adapterAPIEndPoint: adapter.Endpoint,
+ liveProbeInterval: liveProbeInterval,
+ }
+}
+
+func (aa *agent) start(ctx context.Context) error {
+ // Establish grpc connection to Core
+ var err error
+ if aa.vClient, err = vgrpc.NewClient(aa.adapterAPIEndPoint,
+ aa.onAdapterRestart,
+ vgrpc.ActivityCheck(true)); err != nil {
+ return err
+ }
+
+ // Add a liveness communication update
+ aa.vClient.SubscribeForLiveness(aa.updateCommunicationTime)
+
+ go aa.vClient.Start(ctx, setAndTestAdapterServiceHandler)
+ return nil
+}
+
+func (aa *agent) stop(ctx context.Context) {
+ // Close the client
+ if aa.vClient != nil {
+ aa.vClient.Stop(ctx)
}
}
func (aa *agent) getAdapter(ctx context.Context) *voltha.Adapter {
- aa.lock.RLock()
- defer aa.lock.RUnlock()
- logger.Debugw(ctx, "getAdapter", log.Fields{"adapter": aa.adapter})
+ aa.adapterLock.RLock()
+ defer aa.adapterLock.RUnlock()
return aa.adapter
}
+func (aa *agent) getClient() (adapter_services.AdapterServiceClient, error) {
+ client, err := aa.vClient.GetClient()
+ if err != nil {
+ return nil, err
+ }
+ c, ok := client.(adapter_services.AdapterServiceClient)
+ if ok {
+ return c, nil
+ }
+ return nil, errors.New("invalid client returned")
+}
+
+func (aa *agent) resetConnection(ctx context.Context) {
+ if aa.vClient != nil {
+ aa.vClient.Reset(ctx)
+ }
+}
+
// updateCommunicationTime updates the message to the specified time.
// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
func (aa *agent) updateCommunicationTime(new time.Time) {
// only update if new time is not in the future, and either the old time is invalid or new time > old time
aa.lock.Lock()
defer aa.lock.Unlock()
- if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
- timestamp, err := ptypes.TimestampProto(new)
- if err != nil {
- return // if the new time cannot be encoded, just ignore it
- }
-
- aa.adapter.LastCommunication = timestamp
+ timestamp := time.Unix(aa.adapter.LastCommunication, 0)
+ if !new.After(time.Now()) && new.After(timestamp) {
+ timestamp = new
+ aa.adapter.LastCommunication = timestamp.Unix()
}
}
+
+func (aa *agent) IsConnectionUp() bool {
+ _, err := aa.getClient()
+ return err == nil
+}