VOL-2894 - reference the voltha GRPC end point via a holder

Change-Id: If24299556ad6cacf9cd0f793167a2c908534729c
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
index dc3aa39..fdf99e1 100644
--- a/internal/pkg/ofagent/changeEvent.go
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -19,12 +19,13 @@
 import (
 	"context"
 	"encoding/json"
+	"net"
+
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/ofagent-go/internal/pkg/openflow"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"google.golang.org/grpc"
-	"net"
 )
 
 func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
@@ -41,7 +42,7 @@
 	opt := grpc.EmptyCallOption{}
 	streamCtx, streamDone := context.WithCancel(context.Background())
 	defer streamDone()
-	stream, err := ofa.volthaClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
+	stream, err := ofa.volthaClient.Get().ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
 	if err != nil {
 		logger.Errorw("Unable to establish Receive Change Event Stream",
 			log.Fields{"error": err})
diff --git a/internal/pkg/ofagent/connection.go b/internal/pkg/ofagent/connection.go
index 54a4ed8..b84dc3c 100644
--- a/internal/pkg/ofagent/connection.go
+++ b/internal/pkg/ofagent/connection.go
@@ -18,12 +18,13 @@
 import (
 	"context"
 	"errors"
+	"time"
+
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-lib-go/v3/pkg/probe"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc"
-	"time"
 )
 
 func (ofa *OFAgent) establishConnectionToVoltha(p *probe.Probe) error {
@@ -36,7 +37,7 @@
 	}
 
 	ofa.volthaConnection = nil
-	ofa.volthaClient = nil
+	ofa.volthaClient.Clear()
 	try := 1
 	for ofa.ConnectionMaxRetries == 0 || try < ofa.ConnectionMaxRetries {
 		conn, err := grpc.Dial(ofa.VolthaApiEndPoint, grpc.WithInsecure())
@@ -49,7 +50,7 @@
 							"VolthaApiEndPoint": ofa.VolthaApiEndPoint,
 						})
 					ofa.volthaConnection = conn
-					ofa.volthaClient = svc
+					ofa.volthaClient.Set(svc)
 					if p != nil {
 						p.UpdateStatus("voltha", probe.ServiceStatusRunning)
 					}
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
index 419a988..f0ebd0e 100644
--- a/internal/pkg/ofagent/ofagent.go
+++ b/internal/pkg/ofagent/ofagent.go
@@ -18,14 +18,15 @@
 
 import (
 	"context"
-	"fmt"
+	"sync"
+	"time"
+
+	"github.com/opencord/ofagent-go/internal/pkg/holder"
 	"github.com/opencord/ofagent-go/internal/pkg/openflow"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-lib-go/v3/pkg/probe"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc"
-	"sync"
-	"time"
 )
 
 type ofaEvent byte
@@ -50,7 +51,7 @@
 	ConnectionRetryDelay      time.Duration
 
 	volthaConnection *grpc.ClientConn
-	volthaClient     voltha.VolthaServiceClient
+	volthaClient     *holder.VolthaServiceClientHolder
 	mapLock          sync.Mutex
 	clientMap        map[string]*openflow.OFClient
 	events           chan ofaEvent
@@ -67,6 +68,7 @@
 		DeviceListRefreshInterval: config.DeviceListRefreshInterval,
 		ConnectionMaxRetries:      config.ConnectionMaxRetries,
 		ConnectionRetryDelay:      config.ConnectionRetryDelay,
+		volthaClient:              &holder.VolthaServiceClientHolder{},
 		packetInChannel:           make(chan *voltha.PacketIn),
 		packetOutChannel:          make(chan *voltha.PacketOut),
 		changeEventChannel:        make(chan *voltha.ChangeEvent),
@@ -161,16 +163,6 @@
 				if state != ofaStateConnected {
 					state = ofaStateConnected
 					volthaCtx, volthaDone = context.WithCancel(context.Background())
-					// Reconnect clients
-					for _, client := range ofa.clientMap {
-						if logger.V(log.DebugLevel) {
-							logger.Debugw("reset-client-voltha-connection",
-								log.Fields{
-									"from": fmt.Sprintf("0x%p", &client.VolthaClient),
-									"to":   fmt.Sprintf("0x%p", &ofa.volthaClient)})
-						}
-						client.VolthaClient = ofa.volthaClient
-					}
 					go ofa.receiveChangeEvents(volthaCtx)
 					go ofa.receivePacketsIn(volthaCtx)
 					go ofa.streamPacketOut(volthaCtx)
@@ -184,16 +176,7 @@
 				logger.Debug("ofagent-voltha-disconnect-event")
 				if state == ofaStateConnected {
 					state = ofaStateDisconnected
-					ofa.volthaClient = nil
-					for _, client := range ofa.clientMap {
-						client.VolthaClient = nil
-						if logger.V(log.DebugLevel) {
-							logger.Debugw("reset-client-voltha-connection",
-								log.Fields{
-									"from": fmt.Sprintf("0x%p", &client.VolthaClient),
-									"to":   "nil"})
-						}
-					}
+					ofa.volthaClient.Clear()
 					volthaDone()
 					volthaDone = nil
 				}
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
index 0f5ad71..685ab43 100644
--- a/internal/pkg/ofagent/packetIn.go
+++ b/internal/pkg/ofagent/packetIn.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"encoding/json"
+
 	"github.com/donNewtonAlpha/goloxi"
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/golang/protobuf/ptypes/empty"
@@ -43,7 +44,7 @@
 	opt := grpc.EmptyCallOption{}
 	streamCtx, streamDone := context.WithCancel(context.Background())
 	defer streamDone()
-	stream, err := ofa.volthaClient.ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
+	stream, err := ofa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
 	if err != nil {
 		logger.Errorw("Unable to establish Receive PacketIn Stream",
 			log.Fields{"error": err})
diff --git a/internal/pkg/ofagent/packetOut.go b/internal/pkg/ofagent/packetOut.go
index 928d19c..ffeafec 100644
--- a/internal/pkg/ofagent/packetOut.go
+++ b/internal/pkg/ofagent/packetOut.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"encoding/json"
+
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"google.golang.org/grpc"
 )
@@ -36,7 +37,7 @@
 	}
 	opt := grpc.EmptyCallOption{}
 	streamCtx, streamDone := context.WithCancel(context.Background())
-	outClient, err := ofa.volthaClient.StreamPacketsOut(streamCtx, opt)
+	outClient, err := ofa.volthaClient.Get().StreamPacketsOut(streamCtx, opt)
 	defer streamDone()
 	if err != nil {
 		logger.Errorw("streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
index 835e551..1744578 100644
--- a/internal/pkg/ofagent/refresh.go
+++ b/internal/pkg/ofagent/refresh.go
@@ -18,10 +18,11 @@
 
 import (
 	"context"
+	"time"
+
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/ofagent-go/internal/pkg/openflow"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"time"
 )
 
 func (ofa *OFAgent) synchronizeDeviceList(ctx context.Context) {
@@ -48,7 +49,7 @@
 		ofa.events <- ofaEventVolthaDisconnected
 		return
 	}
-	deviceList, err := ofa.volthaClient.ListLogicalDevices(context.Background(), &empty.Empty{})
+	deviceList, err := ofa.volthaClient.Get().ListLogicalDevices(context.Background(), &empty.Empty{})
 	if err != nil {
 		logger.Errorw("ofagent failed to query device list from voltha",
 			log.Fields{"error": err})