VOL-2894 - reference the voltha GRPC end point via a holder
Change-Id: If24299556ad6cacf9cd0f793167a2c908534729c
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
index dc3aa39..fdf99e1 100644
--- a/internal/pkg/ofagent/changeEvent.go
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -19,12 +19,13 @@
import (
"context"
"encoding/json"
+ "net"
+
ofp "github.com/donNewtonAlpha/goloxi/of13"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/ofagent-go/internal/pkg/openflow"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"google.golang.org/grpc"
- "net"
)
func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
@@ -41,7 +42,7 @@
opt := grpc.EmptyCallOption{}
streamCtx, streamDone := context.WithCancel(context.Background())
defer streamDone()
- stream, err := ofa.volthaClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
+ stream, err := ofa.volthaClient.Get().ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
if err != nil {
logger.Errorw("Unable to establish Receive Change Event Stream",
log.Fields{"error": err})
diff --git a/internal/pkg/ofagent/connection.go b/internal/pkg/ofagent/connection.go
index 54a4ed8..b84dc3c 100644
--- a/internal/pkg/ofagent/connection.go
+++ b/internal/pkg/ofagent/connection.go
@@ -18,12 +18,13 @@
import (
"context"
"errors"
+ "time"
+
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-lib-go/v3/pkg/probe"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc"
- "time"
)
func (ofa *OFAgent) establishConnectionToVoltha(p *probe.Probe) error {
@@ -36,7 +37,7 @@
}
ofa.volthaConnection = nil
- ofa.volthaClient = nil
+ ofa.volthaClient.Clear()
try := 1
for ofa.ConnectionMaxRetries == 0 || try < ofa.ConnectionMaxRetries {
conn, err := grpc.Dial(ofa.VolthaApiEndPoint, grpc.WithInsecure())
@@ -49,7 +50,7 @@
"VolthaApiEndPoint": ofa.VolthaApiEndPoint,
})
ofa.volthaConnection = conn
- ofa.volthaClient = svc
+ ofa.volthaClient.Set(svc)
if p != nil {
p.UpdateStatus("voltha", probe.ServiceStatusRunning)
}
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
index 419a988..f0ebd0e 100644
--- a/internal/pkg/ofagent/ofagent.go
+++ b/internal/pkg/ofagent/ofagent.go
@@ -18,14 +18,15 @@
import (
"context"
- "fmt"
+ "sync"
+ "time"
+
+ "github.com/opencord/ofagent-go/internal/pkg/holder"
"github.com/opencord/ofagent-go/internal/pkg/openflow"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-lib-go/v3/pkg/probe"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc"
- "sync"
- "time"
)
type ofaEvent byte
@@ -50,7 +51,7 @@
ConnectionRetryDelay time.Duration
volthaConnection *grpc.ClientConn
- volthaClient voltha.VolthaServiceClient
+ volthaClient *holder.VolthaServiceClientHolder
mapLock sync.Mutex
clientMap map[string]*openflow.OFClient
events chan ofaEvent
@@ -67,6 +68,7 @@
DeviceListRefreshInterval: config.DeviceListRefreshInterval,
ConnectionMaxRetries: config.ConnectionMaxRetries,
ConnectionRetryDelay: config.ConnectionRetryDelay,
+ volthaClient: &holder.VolthaServiceClientHolder{},
packetInChannel: make(chan *voltha.PacketIn),
packetOutChannel: make(chan *voltha.PacketOut),
changeEventChannel: make(chan *voltha.ChangeEvent),
@@ -161,16 +163,6 @@
if state != ofaStateConnected {
state = ofaStateConnected
volthaCtx, volthaDone = context.WithCancel(context.Background())
- // Reconnect clients
- for _, client := range ofa.clientMap {
- if logger.V(log.DebugLevel) {
- logger.Debugw("reset-client-voltha-connection",
- log.Fields{
- "from": fmt.Sprintf("0x%p", &client.VolthaClient),
- "to": fmt.Sprintf("0x%p", &ofa.volthaClient)})
- }
- client.VolthaClient = ofa.volthaClient
- }
go ofa.receiveChangeEvents(volthaCtx)
go ofa.receivePacketsIn(volthaCtx)
go ofa.streamPacketOut(volthaCtx)
@@ -184,16 +176,7 @@
logger.Debug("ofagent-voltha-disconnect-event")
if state == ofaStateConnected {
state = ofaStateDisconnected
- ofa.volthaClient = nil
- for _, client := range ofa.clientMap {
- client.VolthaClient = nil
- if logger.V(log.DebugLevel) {
- logger.Debugw("reset-client-voltha-connection",
- log.Fields{
- "from": fmt.Sprintf("0x%p", &client.VolthaClient),
- "to": "nil"})
- }
- }
+ ofa.volthaClient.Clear()
volthaDone()
volthaDone = nil
}
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
index 0f5ad71..685ab43 100644
--- a/internal/pkg/ofagent/packetIn.go
+++ b/internal/pkg/ofagent/packetIn.go
@@ -19,6 +19,7 @@
import (
"context"
"encoding/json"
+
"github.com/donNewtonAlpha/goloxi"
ofp "github.com/donNewtonAlpha/goloxi/of13"
"github.com/golang/protobuf/ptypes/empty"
@@ -43,7 +44,7 @@
opt := grpc.EmptyCallOption{}
streamCtx, streamDone := context.WithCancel(context.Background())
defer streamDone()
- stream, err := ofa.volthaClient.ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
+ stream, err := ofa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
if err != nil {
logger.Errorw("Unable to establish Receive PacketIn Stream",
log.Fields{"error": err})
diff --git a/internal/pkg/ofagent/packetOut.go b/internal/pkg/ofagent/packetOut.go
index 928d19c..ffeafec 100644
--- a/internal/pkg/ofagent/packetOut.go
+++ b/internal/pkg/ofagent/packetOut.go
@@ -19,6 +19,7 @@
import (
"context"
"encoding/json"
+
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"google.golang.org/grpc"
)
@@ -36,7 +37,7 @@
}
opt := grpc.EmptyCallOption{}
streamCtx, streamDone := context.WithCancel(context.Background())
- outClient, err := ofa.volthaClient.StreamPacketsOut(streamCtx, opt)
+ outClient, err := ofa.volthaClient.Get().StreamPacketsOut(streamCtx, opt)
defer streamDone()
if err != nil {
logger.Errorw("streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
index 835e551..1744578 100644
--- a/internal/pkg/ofagent/refresh.go
+++ b/internal/pkg/ofagent/refresh.go
@@ -18,10 +18,11 @@
import (
"context"
+ "time"
+
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/ofagent-go/internal/pkg/openflow"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
- "time"
)
func (ofa *OFAgent) synchronizeDeviceList(ctx context.Context) {
@@ -48,7 +49,7 @@
ofa.events <- ofaEventVolthaDisconnected
return
}
- deviceList, err := ofa.volthaClient.ListLogicalDevices(context.Background(), &empty.Empty{})
+ deviceList, err := ofa.volthaClient.Get().ListLogicalDevices(context.Background(), &empty.Empty{})
if err != nil {
logger.Errorw("ofagent failed to query device list from voltha",
log.Fields{"error": err})