Reworked connection to use a single thread for state management.

Also disabled the SetConnection API call.

Stream cleanup.

Removed Unnescessary threads, there is now one thread per connection (handling response stream forwarding), and the existing thread is used to forward the request stream.
Renamed 'streams' to 'request'.
Renamed 'nbFrame' to 'requestFrame'.
Renamed 'sbFrame' to 'responseFrame'.

Changed handling of streaming requests.

Incoming & Outgoing streams are split when a connection becomes ready.
Added playback of non-streaming requests/responses for newly opened streams.

Late stream catchup fix & streaming call detection.

Fixed an issue where old streams were not being caught up with what they missed.
Streaming requests & responses are now detected based on the proto definitions.
Changed where the proto file is specified in the afrouter config (see afrouter/arouter.json for an example).

Fixed mutex copy.

Also tweaked some log statements.

Fixed field tag lint error.

Change-Id: I6e14039c27519d8d2103065258ff4302bc881235
diff --git a/afrouter/afrouter/backend.go b/afrouter/afrouter/backend.go
index 31f431a..3c7815d 100644
--- a/afrouter/afrouter/backend.go
+++ b/afrouter/afrouter/backend.go
@@ -25,9 +25,7 @@
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/connectivity"
 	"google.golang.org/grpc/metadata"
-	"io"
 	"net/url"
 	"strconv"
 	"strings"
@@ -42,7 +40,8 @@
 	activeAssociation association
 	connFailCallback  func(string, *backend) bool
 	connections       map[string]*connection
-	openConns         int
+	openConns         map[*connection]*grpc.ClientConn
+	activeRequests    map[*request]struct{}
 }
 
 type association struct {
@@ -52,138 +51,101 @@
 	key      string
 }
 
-func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f *nbFrame) (*streams, error) {
+// splitActiveStreamsUnsafe expects the caller to have already locked the backend mutex
+func (be *backend) splitActiveStreamsUnsafe(cn *connection, conn *grpc.ClientConn) {
+	if len(be.activeRequests) != 0 {
+		log.Debugf("Creating new streams for %d existing requests", len(be.activeRequests))
+	}
+	for r := range be.activeRequests {
+		r.mutex.Lock()
+		if _, have := r.streams[cn.name]; !have {
+			log.Debugf("Opening southbound stream for existing request '%s'", r.methodInfo.method)
+			if stream, err := grpc.NewClientStream(r.ctx, clientStreamDescForProxying, conn, r.methodInfo.all); err != nil {
+				log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
+			} else {
+				go r.catchupRequestStreamThenForwardResponseStream(cn.name, stream)
+				// new thread will unlock the request mutex
+				continue
+			}
+		}
+		r.mutex.Unlock()
+	}
+}
 
-	rtrn := &streams{streams: make(map[string]*stream), activeStream: nil}
+// openSouthboundStreams sets up a connection to each southbound frame
+func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, nf *requestFrame, sf *responseFrame) (*request, error) {
+	be.mutex.Lock()
+	defer be.mutex.Unlock()
 
-	log.Debugf("Opening southbound streams for method '%s'", f.methodInfo.method)
+	isStreamingRequest, isStreamingResponse := nf.router.IsStreaming(nf.methodInfo.method)
+
 	// Get the metadata from the incoming message on the server
 	md, ok := metadata.FromIncomingContext(serverStream.Context())
 	if !ok {
-		return nil, errors.New("Could not get a server stream metadata")
+		return nil, errors.New("could not get a server stream metadata")
 	}
 
+	r := &request{
+		// Create an outgoing context that includes the incoming metadata and that will cancel if the server's context is canceled
+		ctx: metadata.AppendToOutgoingContext(metadata.NewOutgoingContext(serverStream.Context(), md.Copy()), "voltha_serial_number", strconv.FormatUint(nf.serialNo, 10)),
+
+		streams:         make(map[string]grpc.ClientStream),
+		responseErrChan: make(chan error, 1),
+
+		backend:             be,
+		serverStream:        serverStream,
+		methodInfo:          nf.methodInfo,
+		requestFrame:        nf,
+		responseFrame:       sf,
+		isStreamingRequest:  isStreamingRequest,
+		isStreamingResponse: isStreamingResponse,
+	}
+
+	log.Debugf("Opening southbound request for method '%s'", nf.methodInfo.method)
+
 	// TODO: Need to check if this is an active/active backend cluster
 	// with a serial number in the header.
-	log.Debugf("Serial number for transaction allocated: %d", f.serialNo)
+	log.Debugf("Serial number for transaction allocated: %d", nf.serialNo)
 	// If even one stream can be created then proceed. If none can be
-	// created then report an error becase both the primary and redundant
-	// connections are non-existant.
+	// created then report an error because both the primary and redundant
+	// connections are non-existent.
 	var atLeastOne = false
 	var errStr strings.Builder
-	log.Debugf("There are %d connections to open", len(be.connections))
-	for _, cn := range be.connections {
-		// Copy in the metadata
-		if cn.getState() == connectivity.Ready && cn.getConn() != nil {
-			log.Debugf("Opening southbound stream for connection '%s'", cn.name)
-			// Create an outgoing context that includes the incoming metadata
-			// and that will cancel if the server's context is canceled
-			clientCtx, clientCancel := context.WithCancel(serverStream.Context())
-			clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
-			//TODO: Same check here, only add the serial number if necessary
-			clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
-				strconv.FormatUint(f.serialNo, 10))
-			// Create the client stream
-			if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
-				cn.getConn(), f.methodInfo.all); err != nil {
-				log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
-				fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
-				rtrn.streams[cn.name] = nil
-			} else {
-				rtrn.streams[cn.name] = &stream{stream: clientStream, ctx: clientCtx,
-					cancel: clientCancel, s2cReturn: nil,
-					ok2Close:  make(chan struct{}),
-					c2sReturn: make(chan error, 1)}
-				atLeastOne = true
-			}
-		} else if cn.getConn() == nil {
-			err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name))
-			fmt.Fprint(&errStr, err.Error())
-			log.Debug(err)
+	log.Debugf("There are %d/%d streams to open", len(be.openConns), len(be.connections))
+	for cn, conn := range be.openConns {
+		log.Debugf("Opening stream for connection '%s'", cn.name)
+		if stream, err := grpc.NewClientStream(r.ctx, clientStreamDescForProxying, conn, r.methodInfo.all); err != nil {
+			log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
 		} else {
-			err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name))
-			fmt.Fprint(&errStr, err.Error())
-			log.Debug(err)
+			r.streams[cn.name] = stream
+			go r.forwardResponseStream(cn.name, stream)
+			atLeastOne = true
 		}
 	}
 	if atLeastOne {
-		rtrn.sortStreams()
-		return rtrn, nil
+		be.activeRequests[r] = struct{}{}
+		return r, nil
 	}
-	fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ", be.name)
+	fmt.Fprintf(&errStr, "{{No open connections for backend '%s' unable to send}} ", be.name)
 	log.Error(errStr.String())
 	return nil, errors.New(errStr.String())
 }
 
-func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *nbFrame, sf *sbFrame) error {
-
-	// Set up and launch each individual southbound stream
-	var beStrms *streams
-	var rtrn error = nil
-	var s2cOk = false
-	var c2sOk = false
-
-	beStrms, err := be.openSouthboundStreams(srv, serverStream, nf)
+func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *requestFrame, sf *responseFrame) error {
+	// Set up streams for each open connection
+	request, err := be.openSouthboundStreams(srv, serverStream, nf, sf)
 	if err != nil {
 		log.Errorf("openStreams failed: %v", err)
 		return err
 	}
-	// If we get here, there has to be AT LEAST ONE open stream
 
-	// *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
-	// Channels do not have to be closed, it is just a control flow mechanism, see
-	// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
-
-	log.Debug("Starting server to client forwarding")
-	s2cErrChan := beStrms.forwardServerToClient(serverStream, nf)
-
-	log.Debug("Starting client to server forwarding")
-	c2sErrChan := beStrms.forwardClientToServer(serverStream, sf)
-
-	// We don't know which side is going to stop sending first, so we need a select between the two.
-	for i := 0; i < 2; i++ {
-		select {
-		case s2cErr := <-s2cErrChan:
-			s2cOk = true
-			log.Debug("Processing s2cErr")
-			if s2cErr == io.EOF {
-				log.Debug("s2cErr reporting EOF")
-				// this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
-				// the clientStream>serverStream may continue sending though.
-				defer beStrms.closeSend()
-				if c2sOk {
-					return rtrn
-				}
-			} else {
-				log.Debugf("s2cErr reporting %v", s2cErr)
-				// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
-				// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
-				// exit with an error to the stack
-				beStrms.clientCancel()
-				return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
-			}
-		case c2sErr := <-c2sErrChan:
-			c2sOk = true
-			log.Debug("Processing c2sErr")
-			// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
-			// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
-			// will be nil.
-			serverStream.SetTrailer(beStrms.trailer())
-			// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
-			// NOTE!!! with redundant backends, it's likely that one of the backends
-			// returns a response before all the data has been sent southbound and
-			// the southbound streams are closed. Should this happen one of the
-			// backends may not get the request.
-			if c2sErr != io.EOF {
-				rtrn = c2sErr
-			}
-			log.Debug("c2sErr reporting EOF")
-			if s2cOk {
-				return rtrn
-			}
-		}
+	log.Debug("Starting request stream forwarding")
+	if s2cErr := request.forwardRequestStream(serverStream); s2cErr != nil {
+		// exit with an error to the stack
+		return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
 	}
-	return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
+	// wait for response stream to complete
+	return <-request.responseErrChan
 }
 
 func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
@@ -191,7 +153,12 @@
 
 	log.Debugf("Configuring the backend with %v", *conf)
 	// Validate the conifg and configure the backend
-	be := &backend{name: conf.Name, connections: make(map[string]*connection), openConns: 0}
+	be := &backend{
+		name:           conf.Name,
+		connections:    make(map[string]*connection),
+		openConns:      make(map[*connection]*grpc.ClientConn),
+		activeRequests: make(map[*request]struct{}),
+	}
 	if conf.Type == BackendUndefined {
 		log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
 		rtrn_err = true
@@ -248,25 +215,23 @@
 			log.Errorf("A connection must have a name for backend %s in cluster %s",
 				conf.Name, clusterName)
 		} else {
-			gc := &gConnection{conn: nil, cancel: nil, state: connectivity.Idle}
-			be.connections[cnConf.Name] = &connection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, backend: be, gConn: gc}
-			if cnConf.Addr != "" { // This connection will be specified later.
-				if _, err := url.Parse(cnConf.Addr); err != nil {
-					log.Errorf("The address for connection %s in backend %s in cluster %s is invalid: %s",
-						cnConf.Name, conf.Name, clusterName, err)
-					rtrn_err = true
-				}
-				// Validate the port number. This just validtes that it's a non 0 integer
-				if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
+			ctx, cancelFunc := context.WithCancel(context.Background())
+			be.connections[cnConf.Name] = &connection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, backend: be, ctx: ctx, close: cancelFunc}
+			if _, err := url.Parse(cnConf.Addr); err != nil {
+				log.Errorf("The address for connection %s in backend %s in cluster %s is invalid: %s",
+					cnConf.Name, conf.Name, clusterName, err)
+				rtrn_err = true
+			}
+			// Validate the port number. This just validtes that it's a non 0 integer
+			if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
+				log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
+					cnConf.Port, cnConf.Name, conf.Name, clusterName)
+				rtrn_err = true
+			} else {
+				if n <= 0 && n > 65535 {
 					log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
 						cnConf.Port, cnConf.Name, conf.Name, clusterName)
 					rtrn_err = true
-				} else {
-					if n <= 0 && n > 65535 {
-						log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
-							cnConf.Port, cnConf.Name, conf.Name, clusterName)
-						rtrn_err = true
-					}
 				}
 			}
 		}
@@ -281,20 +246,26 @@
 	return be, nil
 }
 
-func (be *backend) incConn() {
+func (be *backend) incConn(cn *connection, conn *grpc.ClientConn) {
 	be.mutex.Lock()
 	defer be.mutex.Unlock()
-	be.openConns++
+
+	be.openConns[cn] = conn
+	be.splitActiveStreamsUnsafe(cn, conn)
 }
 
-func (be *backend) decConn() {
+func (be *backend) decConn(cn *connection) {
 	be.mutex.Lock()
 	defer be.mutex.Unlock()
-	be.openConns--
-	if be.openConns < 0 {
-		log.Error("Internal error, number of open connections less than 0")
-		be.openConns = 0
-	}
+
+	delete(be.openConns, cn)
+}
+
+func (be *backend) NumOpenConnections() int {
+	be.mutex.Lock()
+	defer be.mutex.Unlock()
+
+	return len(be.openConns)
 }
 
 // Attempts to establish all the connections for a backend
@@ -303,7 +274,7 @@
 // handled after that.
 func (be *backend) connectAll() {
 	for _, cn := range be.connections {
-		cn.connect()
+		go cn.connect()
 	}
 }