[VOL-1463 VOL-1464]
This update addresses the 2 jira's listed above.
Fixes the premature stream closure issue
Addresses the determinism of connection pair call ordering
Change-Id: I2e04c447e8a38428ab39c7852f55289d92ee62a5
diff --git a/afrouter/afrouter/backend.go b/afrouter/afrouter/backend.go
index 8ec286e..3f17af1 100644
--- a/afrouter/afrouter/backend.go
+++ b/afrouter/afrouter/backend.go
@@ -25,6 +25,7 @@
"net"
"sync"
"time"
+ "sort"
"errors"
"strconv"
"strings"
@@ -71,6 +72,7 @@
activeAssoc assoc
connFailCallback func(string, *backend)bool
connections map[string]*beConnection
+ srtdConns []*beConnection
opnConns int
}
@@ -105,6 +107,7 @@
strm grpc.ClientStream
ctxt context.Context
cncl context.CancelFunc
+ ok2Close chan struct{}
c2sRtrn chan error
s2cRtrn error
}
@@ -113,6 +116,7 @@
lck sync.Mutex
actvStrm *beClStrm
strms map[string]*beClStrm
+ srtdStrms []*beClStrm
}
//***************************************************************//
@@ -264,7 +268,7 @@
var atLeastOne bool = false
var errStr strings.Builder
log.Debugf("There are %d connections to open", len(be.connections))
- for cnk,cn := range be.connections {
+ for _,cn := range be.srtdConns {
// TODO: THIS IS A HACK to suspend redundancy for binding routers for all calls
// and its very specific to a use case. There should really be a per method
// mechanism to select non-redundant calls for all router types. This needs
@@ -273,12 +277,12 @@
if atLeastOne == true && f.metaKey != NoMeta {
// Don't open any more southbound streams
log.Debugf("Not opening any more SB streams, metaKey = %s", f.metaKey)
- rtrn.strms[cnk] = nil
+ rtrn.strms[cn.name] = nil
continue
}
// Copy in the metadata
if cn.getState() == connectivity.Ready && cn.getConn() != nil {
- log.Debugf("Opening southbound stream for connection '%s'", cnk)
+ log.Debugf("Opening southbound stream for connection '%s'", cn.name)
// Create an outgoing context that includes the incoming metadata
// and that will cancel if the server's context is canceled
clientCtx, clientCancel := context.WithCancel(serverStream.Context())
@@ -289,12 +293,14 @@
// Create the client stream
if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
cn.getConn(), f.mthdSlice[REQ_ALL]); err !=nil {
- log.Debug("Failed to create a client stream '%s', %v",cn.name,err)
+ log.Debugf("Failed to create a client stream '%s', %v",cn.name,err)
fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
- rtrn.strms[cnk] = nil
+ rtrn.strms[cn.name] = nil
} else {
- rtrn.strms[cnk] = &beClStrm{strm:clientStream, ctxt:clientCtx, cncl:clientCancel, s2cRtrn:nil,
- c2sRtrn:make(chan error, 1)}
+ rtrn.strms[cn.name] = &beClStrm{strm:clientStream, ctxt:clientCtx,
+ cncl:clientCancel, s2cRtrn:nil,
+ ok2Close:make(chan struct{}),
+ c2sRtrn:make(chan error, 1)}
atLeastOne = true
}
} else if cn.getConn() == nil {
@@ -308,6 +314,7 @@
}
}
if atLeastOne == true {
+ rtrn.sortStreams()
return rtrn,nil
}
fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ",be.name)
@@ -319,6 +326,9 @@
// Set up and launch each individual southbound stream
var beStrms *beClStrms
+ var rtrn error = nil
+ var s2cOk bool = false
+ var c2sOk bool = false
beStrms, err := be.openSouthboundStreams(srv,serverStream,nf)
if err != nil {
@@ -341,13 +351,16 @@
for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
+ s2cOk = true
log.Debug("Processing s2cErr")
if s2cErr == io.EOF {
log.Debug("s2cErr reporting EOF")
// this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
// the clientStream>serverStream may continue sending though.
- beStrms.closeSend()
- break
+ defer beStrms.closeSend()
+ if c2sOk == true {
+ return rtrn
+ }
} else {
log.Debugf("s2cErr reporting %v",s2cErr)
// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
@@ -357,16 +370,24 @@
return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
case c2sErr := <-c2sErrChan:
+ c2sOk = true
log.Debug("Processing c2sErr")
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
// will be nil.
serverStream.SetTrailer(beStrms.trailer())
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
+ // NOTE!!! with redundant backends, it's likely that one of the backends
+ // returns a response before all the data has been sent southbound and
+ // the southbound streams are closed. Should this happen one of the
+ // backends may not get the request.
if c2sErr != io.EOF {
- return c2sErr
+ rtrn = c2sErr
}
- return nil
+ log.Debug("c2sErr reporting EOF")
+ if s2cOk == true {
+ return rtrn
+ }
}
}
return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
@@ -383,6 +404,7 @@
func (strms *beClStrms) closeSend() {
for _,strm := range strms.strms {
if strm != nil {
+ <-strm.ok2Close
log.Debug("Closing southbound stream")
strm.strm.CloseSend()
}
@@ -438,11 +460,12 @@
} else {
srcS.c2sRtrn <- nil // Inactive responder
}
+ close(srcS.ok2Close)
break
}
if src.setThenGetActive(srcS) != srcS {
srcS.c2sRtrn <- nil
- break
+ continue
}
if i == 0 {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
@@ -513,18 +536,21 @@
var rtrn error
atLeastOne := false
- for _,strm := range strms.strms {
+ for _,strm := range strms.srtdStrms {
if strm != nil {
if err := strm.strm.SendMsg(f); err != nil {
+ log.Debugf("Error on SendMsg: %s", err.Error())
strm.s2cRtrn = err
}
atLeastOne = true
+ } else {
+ log.Debugf("Nil stream")
}
}
// If one of the streams succeeded, declare success
// if none did pick an error and return it.
if atLeastOne == true {
- for _,strm := range strms.strms {
+ for _,strm := range strms.srtdStrms {
if strm != nil {
rtrn = strm.s2cRtrn
if rtrn == nil {
@@ -551,6 +577,7 @@
// Send the message to each of the backend streams
if err := dst.sendAll(f); err != nil {
ret <- err
+ log.Debugf("SendAll failed %s", err.Error())
break
}
log.Debugf("Southbound frame %v", f.payload)
@@ -563,6 +590,28 @@
return ret
}
+func (st * beClStrms) sortStreams() {
+ var tmpKeys []string
+ for k,_ := range st.strms {
+ tmpKeys = append(tmpKeys, k)
+ }
+ sort.Strings(tmpKeys)
+ for _,v := range tmpKeys {
+ st.srtdStrms = append(st.srtdStrms, st.strms[v])
+ }
+}
+
+func (be * backend) sortConns() {
+ var tmpKeys []string
+ for k,_ := range be.connections {
+ tmpKeys = append(tmpKeys, k)
+ }
+ sort.Strings(tmpKeys)
+ for _,v := range tmpKeys {
+ be.srtdConns = append(be.srtdConns, be.connections[v])
+ }
+}
+
func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
var rtrn_err bool = false
@@ -651,9 +700,13 @@
}
}
}
+
if rtrn_err {
return nil, errors.New("Connection configuration failed")
}
+ // Create the sorted connection list for deterministic
+ // active-active call orders.
+ be.sortConns()
// All is well start the backend cluster connections
be.connectAll()
diff --git a/afrouter/arouter.json b/afrouter/arouter.json
index 7b075ce..329a7d4 100644
--- a/afrouter/arouter.json
+++ b/afrouter/arouter.json
@@ -100,14 +100,16 @@
"ReceivePacketsIn",
"ReceiveChangeEvents",
"Subscribe",
- "ListLogicalDevices",
- "GetLogicalDevice",
- "ListDeviceFlowGroups",
- "ListLogicalDeviceFlowGroups",
- "ListDeviceFlows",
"UpdateLogicalDeviceFlowTable",
"UpdateLogicalDeviceFlowGroupTable",
- "ListLogicalDeviceFlows"
+ "GetLogicalDevice",
+ "GetLogicalDevicePort",
+ "EnableLogicalDevicePort",
+ "DisableLogicalDevicePort",
+ "ListLogicalDevices",
+ "ListLogicalDeviceFlows",
+ "ListLogicalDeviceFlowGroups",
+ "ListLogicalDevicePorts"
],
"_TODO":"Overrides not implemented yet, config ignored",
"overrides": [