[VOL-1463 VOL-1464]
This update addresses the 2 jira's listed above.
Fixes the premature stream closure issue
Addresses the determinism of connection pair call ordering

Change-Id: I2e04c447e8a38428ab39c7852f55289d92ee62a5
diff --git a/afrouter/afrouter/backend.go b/afrouter/afrouter/backend.go
index 8ec286e..3f17af1 100644
--- a/afrouter/afrouter/backend.go
+++ b/afrouter/afrouter/backend.go
@@ -25,6 +25,7 @@
 	"net"
 	"sync"
 	"time"
+	"sort"
 	"errors"
 	"strconv"
 	"strings"
@@ -71,6 +72,7 @@
 	activeAssoc assoc
 	connFailCallback func(string, *backend)bool
 	connections map[string]*beConnection
+	srtdConns []*beConnection
 	opnConns int
 }
 
@@ -105,6 +107,7 @@
 	strm grpc.ClientStream
 	ctxt context.Context
 	cncl context.CancelFunc
+	ok2Close chan struct{}
 	c2sRtrn chan error
 	s2cRtrn error
 }
@@ -113,6 +116,7 @@
 	lck sync.Mutex
 	actvStrm *beClStrm
 	strms map[string]*beClStrm
+	srtdStrms []*beClStrm
 }
 
 //***************************************************************//
@@ -264,7 +268,7 @@
 	var atLeastOne bool = false
 	var errStr strings.Builder
 	log.Debugf("There are %d connections to open", len(be.connections))
-	for cnk,cn := range be.connections {
+	for _,cn := range be.srtdConns {
 		// TODO: THIS IS A HACK to suspend redundancy for binding routers for all calls
 		// and its very specific to a use case. There should really be a per method
 		// mechanism to select non-redundant calls for all router types. This needs
@@ -273,12 +277,12 @@
 		if atLeastOne == true && f.metaKey != NoMeta {
 			// Don't open any more southbound streams
 			log.Debugf("Not opening any more SB streams, metaKey = %s", f.metaKey)
-			rtrn.strms[cnk] = nil
+			rtrn.strms[cn.name] = nil
 			continue
 		}
 		// Copy in the metadata
 		if cn.getState() == connectivity.Ready  && cn.getConn() != nil {
-			log.Debugf("Opening southbound stream for connection '%s'", cnk)
+			log.Debugf("Opening southbound stream for connection '%s'", cn.name)
 			// Create an outgoing context that includes the incoming metadata
 			// and that will cancel if the server's context is canceled
 			clientCtx, clientCancel := context.WithCancel(serverStream.Context())
@@ -289,12 +293,14 @@
 			// Create the client stream
 			if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
 														cn.getConn(), f.mthdSlice[REQ_ALL]); err !=nil {
-				log.Debug("Failed to create a client stream '%s', %v",cn.name,err)
+				log.Debugf("Failed to create a client stream '%s', %v",cn.name,err)
 				fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
-				rtrn.strms[cnk] = nil
+				rtrn.strms[cn.name] = nil
 			} else {
-				rtrn.strms[cnk] = &beClStrm{strm:clientStream, ctxt:clientCtx, cncl:clientCancel, s2cRtrn:nil,
-											c2sRtrn:make(chan error, 1)}
+				rtrn.strms[cn.name] = &beClStrm{strm:clientStream, ctxt:clientCtx,
+										cncl:clientCancel, s2cRtrn:nil,
+										ok2Close:make(chan struct{}),
+										c2sRtrn:make(chan error, 1)}
 				atLeastOne = true
 			}
 		} else if cn.getConn() == nil {
@@ -308,6 +314,7 @@
 		}
 	}
 	if atLeastOne == true {
+		rtrn.sortStreams()
 		return rtrn,nil
 	}
 	fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ",be.name)
@@ -319,6 +326,9 @@
 
 	// Set up and launch each individual southbound stream 
 	var beStrms *beClStrms
+	var rtrn error = nil
+	var s2cOk bool = false
+	var c2sOk bool = false
 
 	beStrms, err := be.openSouthboundStreams(srv,serverStream,nf)
 	if err != nil {
@@ -341,13 +351,16 @@
 	for i := 0; i < 2; i++ {
 		select {
 		case s2cErr := <-s2cErrChan:
+			s2cOk = true
 			log.Debug("Processing s2cErr")
 			if s2cErr == io.EOF {
 				log.Debug("s2cErr reporting EOF")
 				// this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
 				// the clientStream>serverStream may continue sending though.
-				beStrms.closeSend()
-				break
+				defer beStrms.closeSend()
+				if c2sOk == true {
+					return rtrn
+				}
 			} else {
 				log.Debugf("s2cErr reporting %v",s2cErr)
 				// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
@@ -357,16 +370,24 @@
 				return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
 			}
 		case c2sErr := <-c2sErrChan:
+			c2sOk = true
 			log.Debug("Processing c2sErr")
 			// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
 			// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
 			// will be nil.
 			serverStream.SetTrailer(beStrms.trailer())
 			// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
+			// NOTE!!! with redundant backends, it's likely that one of the backends
+			// returns a response before all the data has been sent southbound and
+			// the southbound streams are closed. Should this happen one of the
+			// backends may not get the request.
 			if c2sErr != io.EOF {
-				return c2sErr
+				rtrn =  c2sErr
 			}
-			return nil
+			log.Debug("c2sErr reporting EOF")
+			if s2cOk == true {
+				return rtrn
+			}
 		}
 	}
 	return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
@@ -383,6 +404,7 @@
 func (strms *beClStrms) closeSend() {
 	for _,strm := range strms.strms {
 		if strm != nil {
+			<-strm.ok2Close
 			log.Debug("Closing southbound stream")
 			strm.strm.CloseSend()
 		}
@@ -438,11 +460,12 @@
 				} else {
 					srcS.c2sRtrn <- nil // Inactive responder
 				}
+				close(srcS.ok2Close)
 				break
 			}
 			if src.setThenGetActive(srcS) != srcS {
 				srcS.c2sRtrn <- nil
-				break
+				continue
 			}
 			if i == 0 {
 				// This is a bit of a hack, but client to server headers are only readable after first client msg is
@@ -513,18 +536,21 @@
 	var rtrn error
 
 	atLeastOne := false
-	for _,strm := range strms.strms {
+	for _,strm := range strms.srtdStrms {
 		if strm != nil {
 			if err := strm.strm.SendMsg(f); err != nil {
+				log.Debugf("Error on SendMsg: %s", err.Error())
 				strm.s2cRtrn = err
 			}
 			atLeastOne = true
+		} else {
+			log.Debugf("Nil stream")
 		}
 	}
 	// If one of the streams succeeded, declare success
 	// if none did pick an error and return it.
 	if atLeastOne == true {
-		for _,strm := range strms.strms {
+		for _,strm := range strms.srtdStrms {
 			if strm != nil {
 				rtrn = strm.s2cRtrn
 				if rtrn == nil {
@@ -551,6 +577,7 @@
 			// Send the message to each of the backend streams
 			if err := dst.sendAll(f); err != nil {
 				ret <- err
+				log.Debugf("SendAll failed %s", err.Error())
 				break
 			}
 			log.Debugf("Southbound frame %v", f.payload)
@@ -563,6 +590,28 @@
 	return ret
 }
 
+func (st * beClStrms) sortStreams() {
+	var tmpKeys []string
+	for k,_ := range st.strms {
+		tmpKeys = append(tmpKeys, k)
+	}
+	sort.Strings(tmpKeys)
+	for _,v := range tmpKeys {
+		st.srtdStrms = append(st.srtdStrms, st.strms[v])
+	}
+}
+
+func (be * backend) sortConns() {
+	var tmpKeys []string
+	for k,_ := range be.connections {
+		tmpKeys = append(tmpKeys, k)
+	}
+	sort.Strings(tmpKeys)
+	for _,v := range tmpKeys {
+		be.srtdConns = append(be.srtdConns, be.connections[v])
+	}
+}
+
 func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
 	var rtrn_err bool = false
 
@@ -651,9 +700,13 @@
 			}
 		}
 	}
+
 	if rtrn_err {
 		return nil, errors.New("Connection configuration failed")
 	}
+	// Create the sorted connection list for deterministic
+	// active-active call orders.
+	be.sortConns()
 	// All is well start the backend cluster connections
 	be.connectAll()
 
diff --git a/afrouter/arouter.json b/afrouter/arouter.json
index 7b075ce..329a7d4 100644
--- a/afrouter/arouter.json
+++ b/afrouter/arouter.json
@@ -100,14 +100,16 @@
 							   "ReceivePacketsIn",
 							   "ReceiveChangeEvents",
                                "Subscribe",
-                               "ListLogicalDevices",
-                               "GetLogicalDevice",
-                               "ListDeviceFlowGroups",
-                               "ListLogicalDeviceFlowGroups",
-                               "ListDeviceFlows",
                                "UpdateLogicalDeviceFlowTable",
                                "UpdateLogicalDeviceFlowGroupTable",
-                               "ListLogicalDeviceFlows"
+                               "GetLogicalDevice",
+                               "GetLogicalDevicePort",
+                               "EnableLogicalDevicePort",
+                               "DisableLogicalDevicePort",
+                               "ListLogicalDevices",
+                               "ListLogicalDeviceFlows",
+                               "ListLogicalDeviceFlowGroups",
+							   "ListLogicalDevicePorts"
 					],
 					"_TODO":"Overrides not implemented yet, config ignored",
 					"overrides": [