VOL-2518 - reconnect to voltha on disconnect
Change-Id: Ia497bb6a83312f15e54de0d7556753e1d9ef58b0
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
index 69b14ff..20c55de 100644
--- a/internal/pkg/ofagent/changeEvent.go
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -29,38 +29,51 @@
func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
logger.Debug("receive-change-events-started")
+ // If we exit, assume disconnected
+ defer func() {
+ ofa.events <- ofaEventVolthaDisconnected
+ logger.Debug("receive-change-events-finished")
+ }()
+ if ofa.volthaClient == nil {
+ logger.Error("no-voltha-connection")
+ return
+ }
opt := grpc.EmptyCallOption{}
streamCtx, streamDone := context.WithCancel(context.Background())
+ defer streamDone()
stream, err := ofa.volthaClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
if err != nil {
logger.Errorw("Unable to establish Receive Change Event Stream",
log.Fields{"error": err})
- ofa.events <- ofaEventVolthaDisconnected
+ return
}
- defer streamDone()
+top:
for {
select {
case <-ctx.Done():
- return
+ break top
default:
- if ce, err := stream.Recv(); err != nil {
+ ce, err := stream.Recv()
+ if err != nil {
logger.Errorw("error receiving change event",
log.Fields{"error": err})
- ofa.events <- ofaEventVolthaDisconnected
- } else {
- ofa.changeEventChannel <- ce
+ break top
}
+ ofa.changeEventChannel <- ce
+ logger.Debug("receive-change-event-queued")
}
}
}
func (ofa *OFAgent) handleChangeEvents(ctx context.Context) {
- logger.Debugln("handle-change-event-started")
+ logger.Debug("handle-change-event-started")
+
+top:
for {
select {
case <-ctx.Done():
- return
+ break top
case changeEvent := <-ofa.changeEventChannel:
deviceID := changeEvent.GetId()
portStatus := changeEvent.GetPortStatus()
@@ -107,4 +120,6 @@
ofa.getOFClient(deviceID).SendMessage(ofPortStatus)
}
}
+
+ logger.Debug("handle-change-event-finsihed")
}
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
index ea3b091..25041ab 100644
--- a/internal/pkg/ofagent/ofagent.go
+++ b/internal/pkg/ofagent/ofagent.go
@@ -18,6 +18,7 @@
import (
"context"
+ "fmt"
"github.com/opencord/ofagent-go/internal/pkg/openflow"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-lib-go/v3/pkg/probe"
@@ -39,6 +40,7 @@
ofaEventError
ofaStateConnected = ofaState(iota)
+ ofaStateConnecting
ofaStateDisconnected
)
@@ -147,6 +149,7 @@
// Kick off process to attempt to establish
// connection to voltha
+ state = ofaStateConnecting
go ofa.establishConnectionToVoltha(p)
case ofaEventVolthaConnected:
@@ -156,6 +159,16 @@
if state != ofaStateConnected {
state = ofaStateConnected
volthaCtx, volthaDone = context.WithCancel(context.Background())
+ // Reconnect clients
+ for _, client := range ofa.clientMap {
+ if logger.V(log.DebugLevel) {
+ logger.Debugw("reset-client-voltha-connection",
+ log.Fields{
+ "from": fmt.Sprintf("0x%p", &client.VolthaClient),
+ "to": fmt.Sprintf("0x%p", &ofa.volthaClient)})
+ }
+ client.VolthaClient = ofa.volthaClient
+ }
go ofa.receiveChangeEvents(volthaCtx)
go ofa.receivePacketsIn(volthaCtx)
go ofa.streamPacketOut(volthaCtx)
@@ -163,13 +176,31 @@
}
case ofaEventVolthaDisconnected:
+ if p != nil {
+ p.UpdateStatus("voltha", probe.ServiceStatusNotReady)
+ }
logger.Debug("ofagent-voltha-disconnect-event")
if state == ofaStateConnected {
state = ofaStateDisconnected
+ ofa.volthaClient = nil
+ for _, client := range ofa.clientMap {
+ client.VolthaClient = nil
+ if logger.V(log.DebugLevel) {
+ logger.Debugw("reset-client-voltha-connection",
+ log.Fields{
+ "from": fmt.Sprintf("0x%p", &client.VolthaClient),
+ "to": "nil"})
+ }
+ }
volthaDone()
volthaDone = nil
volthaCtx = nil
}
+ if state != ofaStateConnecting {
+ state = ofaStateConnecting
+ go ofa.establishConnectionToVoltha(p)
+ }
+
case ofaEventError:
logger.Debug("ofagent-error-event")
default:
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
index 324d568..175a9c8 100644
--- a/internal/pkg/ofagent/packetIn.go
+++ b/internal/pkg/ofagent/packetIn.go
@@ -31,37 +31,50 @@
func (ofa *OFAgent) receivePacketsIn(ctx context.Context) {
logger.Debug("receive-packets-in-started")
+ // If we exit, assume disconnected
+ defer func() {
+ ofa.events <- ofaEventVolthaDisconnected
+ logger.Debug("receive-packets-in-finished")
+ }()
+ if ofa.volthaClient == nil {
+ logger.Error("no-voltha-connection")
+ return
+ }
opt := grpc.EmptyCallOption{}
streamCtx, streamDone := context.WithCancel(context.Background())
+ defer streamDone()
stream, err := ofa.volthaClient.ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
if err != nil {
logger.Errorw("Unable to establish Receive PacketIn Stream",
log.Fields{"error": err})
+ return
}
- defer streamDone()
+
+top:
for {
select {
case <-ctx.Done():
- return
+ break top
default:
- if pkt, err := stream.Recv(); err != nil {
+ pkt, err := stream.Recv()
+ if err != nil {
logger.Errorw("error receiving packet",
log.Fields{"error": err})
- ofa.events <- ofaEventVolthaDisconnected
- } else {
- ofa.packetInChannel <- pkt
+ break top
}
+ ofa.packetInChannel <- pkt
}
}
}
func (ofa *OFAgent) handlePacketsIn(ctx context.Context) {
logger.Debug("handle-packets-in-started")
+top:
for {
select {
case <-ctx.Done():
- return
+ break top
case packet := <-ofa.packetInChannel:
packetIn := packet.GetPacketIn()
@@ -157,4 +170,5 @@
}
}
+ logger.Debug("handle-packets-in-finished")
}
diff --git a/internal/pkg/ofagent/packetOut.go b/internal/pkg/ofagent/packetOut.go
index 30466a6..928d19c 100644
--- a/internal/pkg/ofagent/packetOut.go
+++ b/internal/pkg/ofagent/packetOut.go
@@ -24,8 +24,15 @@
)
func (ofa *OFAgent) streamPacketOut(ctx context.Context) {
- if logger.V(log.DebugLevel) {
- logger.Debug("GrpcClient streamPacketOut called")
+ logger.Debug("packet-out-started")
+ // If we exit, assume disconnected
+ defer func() {
+ ofa.events <- ofaEventVolthaDisconnected
+ logger.Debug("packet-out-finished")
+ }()
+ if ofa.volthaClient == nil {
+ logger.Error("no-voltha-connection")
+ return
}
opt := grpc.EmptyCallOption{}
streamCtx, streamDone := context.WithCancel(context.Background())
@@ -33,18 +40,24 @@
defer streamDone()
if err != nil {
logger.Errorw("streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
- ofa.events <- ofaEventVolthaDisconnected
+ return
}
+top:
for {
select {
case <-ctx.Done():
- return
+ break top
case ofPacketOut := <-ofa.packetOutChannel:
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(ofPacketOut)
logger.Debugw("streamPacketOut Receive PacketOut from Channel", log.Fields{"PacketOut": js})
}
- outClient.Send(ofPacketOut)
+ if err := outClient.Send(ofPacketOut); err != nil {
+ logger.Errorw("packet-out-send-error",
+ log.Fields{"error": err.Error()})
+ break top
+ }
+ logger.Debug("packet-out-send")
}
}
}
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
index 275ba2e..d50b3ec 100644
--- a/internal/pkg/ofagent/refresh.go
+++ b/internal/pkg/ofagent/refresh.go
@@ -42,10 +42,17 @@
}
func (ofa *OFAgent) refreshDeviceList() {
+ // If we exit, assume disconnected
+ if ofa.volthaClient == nil {
+ logger.Error("no-voltha-connection")
+ ofa.events <- ofaEventVolthaDisconnected
+ return
+ }
deviceList, err := ofa.volthaClient.ListLogicalDevices(context.Background(), &empty.Empty{})
if err != nil {
logger.Errorw("ofagent failed to query device list from voltha",
log.Fields{"error": err})
+ ofa.events <- ofaEventVolthaDisconnected
return
}
devices := deviceList.GetItems()