VOL-2894 - reference the voltha GRPC end point via a holder

Change-Id: If24299556ad6cacf9cd0f793167a2c908534729c
diff --git a/internal/pkg/holder/doc.go b/internal/pkg/holder/doc.go
new file mode 100644
index 0000000..38ee85a
--- /dev/null
+++ b/internal/pkg/holder/doc.go
@@ -0,0 +1,31 @@
+/*
+ *   Copyright 2020-present Open Networking Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+// The implementation of the open flow agent (ofagent) uses a GRPC connection
+// to VOLTHA accross several implementaton packages including the ofagent and
+// openflow packages. This GRPC connection is shared through packages and type
+// instances via injection.
+//
+// As the ofagent executes within a micro-service environment it is
+// possible that the GRPC connection is reset (re-established). When the connection
+// is re-established, because injection is used, the new value needs to be
+// re-injected across the implementation.
+//
+// To help simply the re-injection or value change scenario a holder for the
+// GRPC connection is established so that the reference to the holder can
+// stay [sic] consistant over the lifetime of the ofagent while the underlying
+// GRPC connection can change without walking the entire runtime structure.
+package holder
diff --git a/internal/pkg/holder/holder.go b/internal/pkg/holder/holder.go
new file mode 100644
index 0000000..0074582
--- /dev/null
+++ b/internal/pkg/holder/holder.go
@@ -0,0 +1,55 @@
+/*
+ *   Copyright 2020-present Open Networking Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package holder
+
+import (
+	"sync"
+
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+)
+
+// VolthaServiceClientHolder provides a consistent (voluntarily unmutable) reference
+// point for a mutable value that represents a GRPC service interface to
+// VOLTHA
+type VolthaServiceClientHolder struct {
+	mutex           sync.RWMutex
+	volthaSvcClient voltha.VolthaServiceClient
+}
+
+type VolthaServiceClientReference struct {
+}
+
+// Clear sets the held value to nil (not set)
+func (h *VolthaServiceClientHolder) Clear() {
+	h.mutex.Lock()
+	defer h.mutex.Unlock()
+	h.volthaSvcClient = nil
+}
+
+// Set assigns the value being held to the specified value
+func (h *VolthaServiceClientHolder) Set(client voltha.VolthaServiceClient) {
+	h.mutex.Lock()
+	defer h.mutex.Unlock()
+	h.volthaSvcClient = client
+}
+
+// Get returns the currently held value
+func (h *VolthaServiceClientHolder) Get() voltha.VolthaServiceClient {
+	h.mutex.RLock()
+	defer h.mutex.RUnlock()
+	return h.volthaSvcClient
+}
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
index dc3aa39..fdf99e1 100644
--- a/internal/pkg/ofagent/changeEvent.go
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -19,12 +19,13 @@
 import (
 	"context"
 	"encoding/json"
+	"net"
+
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/ofagent-go/internal/pkg/openflow"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"google.golang.org/grpc"
-	"net"
 )
 
 func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
@@ -41,7 +42,7 @@
 	opt := grpc.EmptyCallOption{}
 	streamCtx, streamDone := context.WithCancel(context.Background())
 	defer streamDone()
-	stream, err := ofa.volthaClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
+	stream, err := ofa.volthaClient.Get().ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
 	if err != nil {
 		logger.Errorw("Unable to establish Receive Change Event Stream",
 			log.Fields{"error": err})
diff --git a/internal/pkg/ofagent/connection.go b/internal/pkg/ofagent/connection.go
index 54a4ed8..b84dc3c 100644
--- a/internal/pkg/ofagent/connection.go
+++ b/internal/pkg/ofagent/connection.go
@@ -18,12 +18,13 @@
 import (
 	"context"
 	"errors"
+	"time"
+
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-lib-go/v3/pkg/probe"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc"
-	"time"
 )
 
 func (ofa *OFAgent) establishConnectionToVoltha(p *probe.Probe) error {
@@ -36,7 +37,7 @@
 	}
 
 	ofa.volthaConnection = nil
-	ofa.volthaClient = nil
+	ofa.volthaClient.Clear()
 	try := 1
 	for ofa.ConnectionMaxRetries == 0 || try < ofa.ConnectionMaxRetries {
 		conn, err := grpc.Dial(ofa.VolthaApiEndPoint, grpc.WithInsecure())
@@ -49,7 +50,7 @@
 							"VolthaApiEndPoint": ofa.VolthaApiEndPoint,
 						})
 					ofa.volthaConnection = conn
-					ofa.volthaClient = svc
+					ofa.volthaClient.Set(svc)
 					if p != nil {
 						p.UpdateStatus("voltha", probe.ServiceStatusRunning)
 					}
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
index 419a988..f0ebd0e 100644
--- a/internal/pkg/ofagent/ofagent.go
+++ b/internal/pkg/ofagent/ofagent.go
@@ -18,14 +18,15 @@
 
 import (
 	"context"
-	"fmt"
+	"sync"
+	"time"
+
+	"github.com/opencord/ofagent-go/internal/pkg/holder"
 	"github.com/opencord/ofagent-go/internal/pkg/openflow"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-lib-go/v3/pkg/probe"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc"
-	"sync"
-	"time"
 )
 
 type ofaEvent byte
@@ -50,7 +51,7 @@
 	ConnectionRetryDelay      time.Duration
 
 	volthaConnection *grpc.ClientConn
-	volthaClient     voltha.VolthaServiceClient
+	volthaClient     *holder.VolthaServiceClientHolder
 	mapLock          sync.Mutex
 	clientMap        map[string]*openflow.OFClient
 	events           chan ofaEvent
@@ -67,6 +68,7 @@
 		DeviceListRefreshInterval: config.DeviceListRefreshInterval,
 		ConnectionMaxRetries:      config.ConnectionMaxRetries,
 		ConnectionRetryDelay:      config.ConnectionRetryDelay,
+		volthaClient:              &holder.VolthaServiceClientHolder{},
 		packetInChannel:           make(chan *voltha.PacketIn),
 		packetOutChannel:          make(chan *voltha.PacketOut),
 		changeEventChannel:        make(chan *voltha.ChangeEvent),
@@ -161,16 +163,6 @@
 				if state != ofaStateConnected {
 					state = ofaStateConnected
 					volthaCtx, volthaDone = context.WithCancel(context.Background())
-					// Reconnect clients
-					for _, client := range ofa.clientMap {
-						if logger.V(log.DebugLevel) {
-							logger.Debugw("reset-client-voltha-connection",
-								log.Fields{
-									"from": fmt.Sprintf("0x%p", &client.VolthaClient),
-									"to":   fmt.Sprintf("0x%p", &ofa.volthaClient)})
-						}
-						client.VolthaClient = ofa.volthaClient
-					}
 					go ofa.receiveChangeEvents(volthaCtx)
 					go ofa.receivePacketsIn(volthaCtx)
 					go ofa.streamPacketOut(volthaCtx)
@@ -184,16 +176,7 @@
 				logger.Debug("ofagent-voltha-disconnect-event")
 				if state == ofaStateConnected {
 					state = ofaStateDisconnected
-					ofa.volthaClient = nil
-					for _, client := range ofa.clientMap {
-						client.VolthaClient = nil
-						if logger.V(log.DebugLevel) {
-							logger.Debugw("reset-client-voltha-connection",
-								log.Fields{
-									"from": fmt.Sprintf("0x%p", &client.VolthaClient),
-									"to":   "nil"})
-						}
-					}
+					ofa.volthaClient.Clear()
 					volthaDone()
 					volthaDone = nil
 				}
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
index 0f5ad71..685ab43 100644
--- a/internal/pkg/ofagent/packetIn.go
+++ b/internal/pkg/ofagent/packetIn.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"encoding/json"
+
 	"github.com/donNewtonAlpha/goloxi"
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/golang/protobuf/ptypes/empty"
@@ -43,7 +44,7 @@
 	opt := grpc.EmptyCallOption{}
 	streamCtx, streamDone := context.WithCancel(context.Background())
 	defer streamDone()
-	stream, err := ofa.volthaClient.ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
+	stream, err := ofa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
 	if err != nil {
 		logger.Errorw("Unable to establish Receive PacketIn Stream",
 			log.Fields{"error": err})
diff --git a/internal/pkg/ofagent/packetOut.go b/internal/pkg/ofagent/packetOut.go
index 928d19c..ffeafec 100644
--- a/internal/pkg/ofagent/packetOut.go
+++ b/internal/pkg/ofagent/packetOut.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"encoding/json"
+
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"google.golang.org/grpc"
 )
@@ -36,7 +37,7 @@
 	}
 	opt := grpc.EmptyCallOption{}
 	streamCtx, streamDone := context.WithCancel(context.Background())
-	outClient, err := ofa.volthaClient.StreamPacketsOut(streamCtx, opt)
+	outClient, err := ofa.volthaClient.Get().StreamPacketsOut(streamCtx, opt)
 	defer streamDone()
 	if err != nil {
 		logger.Errorw("streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
index 835e551..1744578 100644
--- a/internal/pkg/ofagent/refresh.go
+++ b/internal/pkg/ofagent/refresh.go
@@ -18,10 +18,11 @@
 
 import (
 	"context"
+	"time"
+
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/ofagent-go/internal/pkg/openflow"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"time"
 )
 
 func (ofa *OFAgent) synchronizeDeviceList(ctx context.Context) {
@@ -48,7 +49,7 @@
 		ofa.events <- ofaEventVolthaDisconnected
 		return
 	}
-	deviceList, err := ofa.volthaClient.ListLogicalDevices(context.Background(), &empty.Empty{})
+	deviceList, err := ofa.volthaClient.Get().ListLogicalDevices(context.Background(), &empty.Empty{})
 	if err != nil {
 		logger.Errorw("ofagent failed to query device list from voltha",
 			log.Fields{"error": err})
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
index aeac406..3f48e0c 100644
--- a/internal/pkg/openflow/client.go
+++ b/internal/pkg/openflow/client.go
@@ -19,11 +19,13 @@
 import (
 	"context"
 	"errors"
-	ofp "github.com/donNewtonAlpha/goloxi/of13"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"sync"
 	"time"
+
+	ofp "github.com/donNewtonAlpha/goloxi/of13"
+	"github.com/opencord/ofagent-go/internal/pkg/holder"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
 var NoVolthaConnectionError = errors.New("no-voltha-connection")
@@ -87,7 +89,7 @@
 type OFClient struct {
 	OFControllerEndPoints []string
 	DeviceID              string
-	VolthaClient          voltha.VolthaServiceClient
+	VolthaClient          *holder.VolthaServiceClientHolder
 	PacketOutChannel      chan *voltha.PacketOut
 	ConnectionMaxRetries  int
 	ConnectionRetryDelay  time.Duration
diff --git a/internal/pkg/openflow/connection.go b/internal/pkg/openflow/connection.go
index 7d57d34..691644c 100644
--- a/internal/pkg/openflow/connection.go
+++ b/internal/pkg/openflow/connection.go
@@ -21,19 +21,21 @@
 	"encoding/binary"
 	"encoding/json"
 	"errors"
-	"github.com/donNewtonAlpha/goloxi"
-	ofp "github.com/donNewtonAlpha/goloxi/of13"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"io"
 	"net"
 	"time"
+
+	"github.com/donNewtonAlpha/goloxi"
+	ofp "github.com/donNewtonAlpha/goloxi/of13"
+	"github.com/opencord/ofagent-go/internal/pkg/holder"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
 type OFConnection struct {
 	OFControllerEndPoint string
 	DeviceID             string
-	VolthaClient         voltha.VolthaServiceClient
+	VolthaClient         *holder.VolthaServiceClientHolder
 	PacketOutChannel     chan *voltha.PacketOut
 	ConnectionMaxRetries int
 	ConnectionRetryDelay time.Duration
diff --git a/internal/pkg/openflow/feature.go b/internal/pkg/openflow/feature.go
index 82353db..99da50c 100644
--- a/internal/pkg/openflow/feature.go
+++ b/internal/pkg/openflow/feature.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"encoding/json"
+
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-protos/v3/go/common"
@@ -32,11 +33,12 @@
 				"device-id": ofc.DeviceID,
 				"request":   js})
 	}
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		return NoVolthaConnectionError
 	}
 	var id = common.ID{Id: ofc.DeviceID}
-	logicalDevice, err := ofc.VolthaClient.GetLogicalDevice(context.Background(), &id)
+	logicalDevice, err := volthaClient.GetLogicalDevice(context.Background(), &id)
 	if err != nil {
 		return err
 	}
diff --git a/internal/pkg/openflow/flowMod.go b/internal/pkg/openflow/flowMod.go
index 114a012..5424c12 100644
--- a/internal/pkg/openflow/flowMod.go
+++ b/internal/pkg/openflow/flowMod.go
@@ -20,11 +20,12 @@
 	"context"
 	"encoding/binary"
 	"encoding/json"
+	"unsafe"
+
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"unsafe"
 )
 
 var oxmMap = map[string]int32{
@@ -80,7 +81,8 @@
 				"params":    js})
 	}
 
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		logger.Errorw("no-voltha-connection",
 			log.Fields{"device-id": ofc.DeviceID})
 		return
@@ -227,7 +229,7 @@
 				"flow-mod-object":  flowUpdate,
 				"flow-mod-request": flowUpdateJs})
 	}
-	if _, err := ofc.VolthaClient.UpdateLogicalDeviceFlowTable(context.Background(), &flowUpdate); err != nil {
+	if _, err := volthaClient.UpdateLogicalDeviceFlowTable(context.Background(), &flowUpdate); err != nil {
 		logger.Errorw("Error calling FlowAdd ",
 			log.Fields{
 				"device-id": ofc.DeviceID,
@@ -311,7 +313,8 @@
 				"flow-delete-strict": js})
 	}
 
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		logger.Errorw("no-voltha-connection",
 			log.Fields{"device-id": ofc.DeviceID})
 		return
@@ -412,7 +415,7 @@
 				"device-id":   ofc.DeviceID,
 				"flow-update": flowUpdateJs})
 	}
-	if _, err := ofc.VolthaClient.UpdateLogicalDeviceFlowTable(context.Background(), &flowUpdate); err != nil {
+	if _, err := volthaClient.UpdateLogicalDeviceFlowTable(context.Background(), &flowUpdate); err != nil {
 		logger.Errorw("Error calling FlowDelete ",
 			log.Fields{
 				"device-id": ofc.DeviceID,
diff --git a/internal/pkg/openflow/meter.go b/internal/pkg/openflow/meter.go
index c8ee22c..574e505 100644
--- a/internal/pkg/openflow/meter.go
+++ b/internal/pkg/openflow/meter.go
@@ -17,6 +17,7 @@
 
 import (
 	"encoding/json"
+
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-protos/v3/go/openflow_13"
@@ -32,7 +33,8 @@
 				"request":   js})
 	}
 
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		logger.Errorw("no-voltha-connection",
 			log.Fields{"device-id": ofc.DeviceID})
 		return
@@ -91,7 +93,7 @@
 				"device-id":         ofc.DeviceID,
 				"meter-mod-request": meterModJS})
 	}
-	if _, err := ofc.VolthaClient.UpdateLogicalDeviceMeterTable(context.Background(), &meterModUpdate); err != nil {
+	if _, err := volthaClient.UpdateLogicalDeviceMeterTable(context.Background(), &meterModUpdate); err != nil {
 		logger.Errorw("Error calling UpdateLogicalDeviceMeterTable",
 			log.Fields{
 				"device-id": ofc.DeviceID,
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index 86c06d6..246bc80 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -19,13 +19,14 @@
 import (
 	"context"
 	"encoding/json"
+	"net"
+	"unsafe"
+
 	"github.com/donNewtonAlpha/goloxi"
 	ofp "github.com/donNewtonAlpha/goloxi/of13"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-protos/v3/go/common"
 	"github.com/opencord/voltha-protos/v3/go/openflow_13"
-	"net"
-	"unsafe"
 )
 
 func (ofc *OFConnection) handleStatsRequest(request ofp.IHeader, statType uint16) error {
@@ -289,7 +290,8 @@
 }
 
 func (ofc *OFConnection) handleDescStatsRequest(request *ofp.DescStatsRequest) (*ofp.DescStatsReply, error) {
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
 	response := ofp.NewDescStatsReply()
@@ -297,7 +299,7 @@
 	response.SetVersion(request.GetVersion())
 	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
 
-	resp, err := ofc.VolthaClient.GetLogicalDevice(context.Background(),
+	resp, err := volthaClient.GetLogicalDevice(context.Background(),
 		&common.ID{Id: ofc.DeviceID})
 	if err != nil {
 		return nil, err
@@ -313,14 +315,15 @@
 }
 
 func (ofc *OFConnection) handleFlowStatsRequest(request *ofp.FlowStatsRequest) (*ofp.FlowStatsReply, error) {
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
 	response := ofp.NewFlowStatsReply()
 	response.SetXid(request.GetXid())
 	response.SetVersion(4)
 	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
-	resp, err := ofc.VolthaClient.ListLogicalDeviceFlows(context.Background(),
+	resp, err := volthaClient.ListLogicalDeviceFlows(context.Background(),
 		&common.ID{Id: ofc.DeviceID})
 	if err != nil {
 		return nil, err
@@ -388,14 +391,15 @@
 }
 
 func (ofc *OFConnection) handleGroupStatsRequest(request *ofp.GroupStatsRequest) (*ofp.GroupStatsReply, error) {
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
 	response := ofp.NewGroupStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
 	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
-	reply, err := ofc.VolthaClient.ListLogicalDeviceFlowGroups(context.Background(),
+	reply, err := volthaClient.ListLogicalDeviceFlowGroups(context.Background(),
 		&common.ID{Id: ofc.DeviceID})
 	if err != nil {
 		return nil, err
@@ -426,14 +430,15 @@
 }
 
 func (ofc *OFConnection) handleGroupStatsDescRequest(request *ofp.GroupDescStatsRequest) (*ofp.GroupDescStatsReply, error) {
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
 	response := ofp.NewGroupDescStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
 	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
-	reply, err := ofc.VolthaClient.ListLogicalDeviceFlowGroups(context.Background(),
+	reply, err := volthaClient.ListLogicalDeviceFlowGroups(context.Background(),
 		&common.ID{Id: ofc.DeviceID})
 	if err != nil {
 		return nil, err
@@ -468,14 +473,15 @@
 }
 
 func (ofc *OFConnection) handleMeterStatsRequest(request *ofp.MeterStatsRequest) (*ofp.MeterStatsReply, error) {
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
 	response := ofp.NewMeterStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
 	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
-	resp, err := ofc.VolthaClient.ListLogicalDeviceMeters(context.Background(),
+	resp, err := volthaClient.ListLogicalDeviceMeters(context.Background(),
 		&common.ID{Id: ofc.DeviceID})
 	if err != nil {
 		return nil, err
@@ -546,14 +552,15 @@
 }
 
 func (ofc *OFConnection) handlePortStatsRequest(request *ofp.PortStatsRequest) (*ofp.PortStatsReply, error) {
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
 	response := ofp.NewPortStatsReply()
 	response.SetXid(request.GetXid())
 	response.SetVersion(request.GetVersion())
 	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
-	reply, err := ofc.VolthaClient.ListLogicalDevicePorts(context.Background(),
+	reply, err := volthaClient.ListLogicalDevicePorts(context.Background(),
 		&common.ID{Id: ofc.DeviceID})
 	if err != nil {
 		return nil, err
@@ -575,14 +582,15 @@
 }
 
 func (ofc *OFConnection) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) (*ofp.PortDescStatsReply, error) {
-	if ofc.VolthaClient == nil {
+	volthaClient := ofc.VolthaClient.Get()
+	if volthaClient == nil {
 		return nil, NoVolthaConnectionError
 	}
 	response := ofp.NewPortDescStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
 	response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
-	logicalDevice, err := ofc.VolthaClient.GetLogicalDevice(context.Background(),
+	logicalDevice, err := volthaClient.GetLogicalDevice(context.Background(),
 		&common.ID{Id: ofc.DeviceID})
 	if err != nil {
 		return nil, err