VOL-2518 - reconnect to voltha on disconnect

Change-Id: Ia497bb6a83312f15e54de0d7556753e1d9ef58b0
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
index eded9e8..97df5d2 100644
--- a/internal/pkg/openflow/client.go
+++ b/internal/pkg/openflow/client.go
@@ -32,6 +32,7 @@
 )
 
 var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+var NoVolthaConnectionError = errors.New("no-voltha-connection")
 
 type ofcEvent byte
 type ofcState byte
@@ -492,15 +493,27 @@
 			if err := ofc.doSend(msg); err != nil {
 				ofc.lastUnsentMessage = msg
 				ofc.events <- ofcEventDisconnect
-				return
+				logger.Debugw("message-sender-error",
+					log.Fields{
+						"device-id": ofc.DeviceID,
+						"error":     err.Error()})
+				break top
 			}
+			logger.Debugw("message-sender-send",
+				log.Fields{
+					"device-id": ofc.DeviceID})
 			ofc.lastUnsentMessage = nil
 		}
 	}
+
+	logger.Debugw("message-sender-finished",
+		log.Fields{
+			"device-id": ofc.DeviceID})
 }
 
 // SendMessage queues a message to be sent to the openflow controller
 func (ofc *OFClient) SendMessage(message Message) error {
+	logger.Debug("queuing-message")
 	ofc.sendChannel <- message
 	return nil
 }
diff --git a/internal/pkg/openflow/feature.go b/internal/pkg/openflow/feature.go
index 54de0d0..f456175 100644
--- a/internal/pkg/openflow/feature.go
+++ b/internal/pkg/openflow/feature.go
@@ -32,6 +32,9 @@
 				"device-id": ofc.DeviceID,
 				"request":   js})
 	}
+	if ofc.VolthaClient == nil {
+		return NoVolthaConnectionError
+	}
 	var id = common.ID{Id: ofc.DeviceID}
 	logicalDevice, err := ofc.VolthaClient.GetLogicalDevice(context.Background(), &id)
 	reply := ofp.NewFeaturesReply()
diff --git a/internal/pkg/openflow/flowMod.go b/internal/pkg/openflow/flowMod.go
index 18390d1..4ab1afc 100644
--- a/internal/pkg/openflow/flowMod.go
+++ b/internal/pkg/openflow/flowMod.go
@@ -77,6 +77,12 @@
 				"params":    js})
 	}
 
+	if ofc.VolthaClient == nil {
+		logger.Errorw("no-voltha-connection",
+			log.Fields{"device-id": ofc.DeviceID})
+		return
+	}
+
 	// Construct the match
 	var oxmList []*voltha.OfpOxmField
 	for _, oxmField := range flowAdd.Match.GetOxmList() {
@@ -257,6 +263,12 @@
 				"flow-delete-strict": js})
 	}
 
+	if ofc.VolthaClient == nil {
+		logger.Errorw("no-voltha-connection",
+			log.Fields{"device-id": ofc.DeviceID})
+		return
+	}
+
 	// Construct match
 	var oxmList []*voltha.OfpOxmField
 	for _, oxmField := range flowDeleteStrict.Match.GetOxmList() {
diff --git a/internal/pkg/openflow/meter.go b/internal/pkg/openflow/meter.go
index d4299ba..61ded75 100644
--- a/internal/pkg/openflow/meter.go
+++ b/internal/pkg/openflow/meter.go
@@ -32,6 +32,12 @@
 				"request":   js})
 	}
 
+	if ofc.VolthaClient == nil {
+		logger.Errorw("no-voltha-connection",
+			log.Fields{"device-id": ofc.DeviceID})
+		return
+	}
+
 	meterModUpdate := openflow_13.MeterModUpdate{Id: ofc.DeviceID}
 	meterMod := openflow_13.OfpMeterMod{
 		MeterId: request.MeterId,
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index 4617598..97d389b 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -288,6 +288,9 @@
 }
 
 func (ofc *OFClient) handleDescStatsRequest(request *ofp.DescStatsRequest) (*ofp.DescStatsReply, error) {
+	if ofc.VolthaClient == nil {
+		return nil, NoVolthaConnectionError
+	}
 	response := ofp.NewDescStatsReply()
 	response.SetXid(request.GetXid())
 	response.SetVersion(request.GetVersion())
@@ -309,6 +312,9 @@
 }
 
 func (ofc *OFClient) handleFlowStatsRequest(request *ofp.FlowStatsRequest) (*ofp.FlowStatsReply, error) {
+	if ofc.VolthaClient == nil {
+		return nil, NoVolthaConnectionError
+	}
 	response := ofp.NewFlowStatsReply()
 	response.SetXid(request.GetXid())
 	response.SetVersion(4)
@@ -382,6 +388,9 @@
 }
 
 func (ofc *OFClient) handleGroupStatsRequest(request *ofp.GroupStatsRequest) (*ofp.GroupStatsReply, error) {
+	if ofc.VolthaClient == nil {
+		return nil, NoVolthaConnectionError
+	}
 	response := ofp.NewGroupStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
@@ -417,6 +426,9 @@
 }
 
 func (ofc *OFClient) handleGroupStatsDescRequest(request *ofp.GroupDescStatsRequest) (*ofp.GroupDescStatsReply, error) {
+	if ofc.VolthaClient == nil {
+		return nil, NoVolthaConnectionError
+	}
 	response := ofp.NewGroupDescStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
@@ -456,6 +468,9 @@
 }
 
 func (ofc *OFClient) handleMeterStatsRequest(request *ofp.MeterStatsRequest) (*ofp.MeterStatsReply, error) {
+	if ofc.VolthaClient == nil {
+		return nil, NoVolthaConnectionError
+	}
 	response := ofp.NewMeterStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())
@@ -531,6 +546,9 @@
 }
 
 func (ofc *OFClient) handlePortStatsRequest(request *ofp.PortStatsRequest) (*ofp.PortStatsReply, error) {
+	if ofc.VolthaClient == nil {
+		return nil, NoVolthaConnectionError
+	}
 	response := ofp.NewPortStatsReply()
 	response.SetXid(request.GetXid())
 	response.SetVersion(request.GetVersion())
@@ -557,6 +575,9 @@
 }
 
 func (ofc *OFClient) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) (*ofp.PortDescStatsReply, error) {
+	if ofc.VolthaClient == nil {
+		return nil, NoVolthaConnectionError
+	}
 	response := ofp.NewPortDescStatsReply()
 	response.SetVersion(request.GetVersion())
 	response.SetXid(request.GetXid())