diff --git a/afrouter/afrouter/affinity-router.go b/afrouter/afrouter/affinity-router.go
index 2390846..583d0a7 100644
--- a/afrouter/afrouter/affinity-router.go
+++ b/afrouter/afrouter/affinity-router.go
@@ -20,10 +20,8 @@
 	"errors"
 	"fmt"
 	"github.com/golang/protobuf/proto"
-	pb "github.com/golang/protobuf/protoc-gen-go/descriptor"
 	"github.com/opencord/voltha-go/common/log"
 	"google.golang.org/grpc"
-	"io/ioutil"
 	"regexp"
 	"strconv"
 )
@@ -38,7 +36,6 @@
 	association        associationType
 	routingField       string
 	grpcService        string
-	protoDescriptor    *pb.FileDescriptorSet
 	methodMap          map[string]byte
 	nbBindingMethodMap map[string]byte
 	cluster            *cluster
@@ -94,37 +91,23 @@
 		rtrn_err = true
 	}
 
-	// Load the protobuf descriptor file
-	dr.protoDescriptor = &pb.FileDescriptorSet{}
-	fb, err := ioutil.ReadFile(config.ProtoFile)
-	if err != nil {
-		log.Errorf("Could not open proto file '%s'", config.ProtoFile)
-		rtrn_err = true
-	}
-	err = proto.Unmarshal(fb, dr.protoDescriptor)
-	if err != nil {
-		log.Errorf("Could not unmarshal %s, %v", "proto.pb", err)
-		rtrn_err = true
-	}
-
 	// Build the routing structure based on the loaded protobuf
 	// descriptor file and the config information.
 	type key struct {
 		method string
 		field  string
 	}
-	var msgs = make(map[key]byte)
-	for _, f := range dr.protoDescriptor.File {
+	var fieldNumberLookup = make(map[key]byte)
+	for _, f := range rconf.protoDescriptor.File {
 		// Build a temporary map of message types by name.
 		for _, m := range f.MessageType {
 			for _, fld := range m.Field {
 				log.Debugf("Processing message '%s', field '%s'", *m.Name, *fld.Name)
-				msgs[key{*m.Name, *fld.Name}] = byte(*fld.Number)
+				fieldNumberLookup[key{*m.Name, *fld.Name}] = byte(*fld.Number)
 			}
 		}
 	}
-	log.Debugf("The map contains: %v", msgs)
-	for _, f := range dr.protoDescriptor.File {
+	for _, f := range rconf.protoDescriptor.File {
 		if *f.Package == rconf.ProtoPackage {
 			for _, s := range f.Service {
 				if *s.Name == rconf.ProtoService {
@@ -147,7 +130,7 @@
 							// The input type has the package name prepended to it. Remove it.
 							//in := (*m.InputType)[len(rconf.ProtoPackage)+2:]
 							in := pkg_methd[PKG_MTHD_MTHD]
-							dr.methodMap[*m.Name], ok = msgs[key{in, config.RouteField}]
+							dr.methodMap[*m.Name], ok = fieldNumberLookup[key{in, config.RouteField}]
 							if !ok {
 								log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
 									*m.Name, config.RouteField, in)
@@ -155,11 +138,11 @@
 							}
 						}
 						// The sb method is always included in the methods so we can check it here too.
-						if needSbMethod(*m.Name, config) {
+						if needNbBindingMethod(*m.Name, config) {
 							log.Debugf("Enabling southbound method '%s'", *m.Name)
 							// The output type has the package name prepended to it. Remove it.
 							out := (*m.OutputType)[len(rconf.ProtoPackage)+2:]
-							dr.nbBindingMethodMap[*m.Name], ok = msgs[key{out, config.RouteField}]
+							dr.nbBindingMethodMap[*m.Name], ok = fieldNumberLookup[key{out, config.RouteField}]
 							if !ok {
 								log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
 									*m.Name, config.RouteField, out)
@@ -188,7 +171,7 @@
 	return dr, nil
 }
 
-func needSbMethod(mthd string, conf *RouteConfig) bool {
+func needNbBindingMethod(mthd string, conf *RouteConfig) bool {
 	for _, m := range conf.NbBindingMethods {
 		if mthd == m {
 			return true
@@ -284,8 +267,8 @@
 
 func (ar AffinityRouter) Route(sel interface{}) *backend {
 	switch sl := sel.(type) {
-	case *nbFrame:
-		log.Debugf("Route called for nbFrame with method %s", sl.methodInfo.method)
+	case *requestFrame:
+		log.Debugf("Route called for requestFrame with method %s", sl.methodInfo.method)
 		// Check if this method should be affinity bound from the
 		// reply rather than the request.
 		if _, ok := ar.nbBindingMethodMap[sl.methodInfo.method]; ok {
@@ -331,6 +314,10 @@
 	return "", "", nil
 }
 
+func (ar AffinityRouter) IsStreaming(_ string) (bool, bool) {
+	panic("not implemented")
+}
+
 func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
 	return ar.cluster, nil
 }
@@ -344,10 +331,8 @@
 
 func (ar AffinityRouter) ReplyHandler(sel interface{}) error {
 	switch sl := sel.(type) {
-	case *sbFrame:
-		sl.mutex.Lock()
-		defer sl.mutex.Unlock()
-		log.Debugf("Reply handler called for sbFrame with method %s", sl.method)
+	case *responseFrame:
+		log.Debugf("Reply handler called for responseFrame with method %s", sl.method)
 		// Determine if reply action is required.
 		if fld, ok := ar.nbBindingMethodMap[sl.method]; ok && len(sl.payload) > 0 {
 			// Extract the field value from the frame and
diff --git a/afrouter/afrouter/api.go b/afrouter/afrouter/api.go
index 87f5573..3e47ef8 100644
--- a/afrouter/afrouter/api.go
+++ b/afrouter/afrouter/api.go
@@ -24,6 +24,7 @@
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 	"net"
+	"net/url"
 	"runtime"
 	"strconv"
 )
@@ -41,7 +42,7 @@
 	var rtrn_err bool
 	// Create a seperate server and listener for the API
 	// Validate the ip address if one is provided
-	if ip := net.ParseIP(config.Addr); config.Addr != "" && ip == nil {
+	if _, err := url.Parse(config.Addr); err != nil {
 		log.Errorf("Invalid address '%s' provided for API server", config.Addr)
 		rtrn_err = true
 	}
@@ -118,17 +119,7 @@
 }
 
 func (aa *ArouterApi) updateConnection(in *pb.Conn, cn *connection, b *backend) error {
-	sPort := strconv.FormatUint(in.Port, 10)
-	// Check that the ip address and or port are different
-	if in.Addr == cn.addr && sPort == cn.port {
-		err := errors.New(fmt.Sprintf("Refusing to change connection '%s' to identical values", in.Connection))
-		return err
-	}
-	cn.close()
-	cn.addr = in.Addr
-	cn.port = sPort
-	cn.connect()
-	return nil
+	return errors.New("updateConnection not implemented")
 }
 
 func (aa ArouterApi) SetAffinity(ctx context.Context, in *pb.Affinity) (*pb.Result, error) {
diff --git a/afrouter/afrouter/backend.go b/afrouter/afrouter/backend.go
index 31f431a..3c7815d 100644
--- a/afrouter/afrouter/backend.go
+++ b/afrouter/afrouter/backend.go
@@ -25,9 +25,7 @@
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/connectivity"
 	"google.golang.org/grpc/metadata"
-	"io"
 	"net/url"
 	"strconv"
 	"strings"
@@ -42,7 +40,8 @@
 	activeAssociation association
 	connFailCallback  func(string, *backend) bool
 	connections       map[string]*connection
-	openConns         int
+	openConns         map[*connection]*grpc.ClientConn
+	activeRequests    map[*request]struct{}
 }
 
 type association struct {
@@ -52,138 +51,101 @@
 	key      string
 }
 
-func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f *nbFrame) (*streams, error) {
+// splitActiveStreamsUnsafe expects the caller to have already locked the backend mutex
+func (be *backend) splitActiveStreamsUnsafe(cn *connection, conn *grpc.ClientConn) {
+	if len(be.activeRequests) != 0 {
+		log.Debugf("Creating new streams for %d existing requests", len(be.activeRequests))
+	}
+	for r := range be.activeRequests {
+		r.mutex.Lock()
+		if _, have := r.streams[cn.name]; !have {
+			log.Debugf("Opening southbound stream for existing request '%s'", r.methodInfo.method)
+			if stream, err := grpc.NewClientStream(r.ctx, clientStreamDescForProxying, conn, r.methodInfo.all); err != nil {
+				log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
+			} else {
+				go r.catchupRequestStreamThenForwardResponseStream(cn.name, stream)
+				// new thread will unlock the request mutex
+				continue
+			}
+		}
+		r.mutex.Unlock()
+	}
+}
 
-	rtrn := &streams{streams: make(map[string]*stream), activeStream: nil}
+// openSouthboundStreams sets up a connection to each southbound frame
+func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, nf *requestFrame, sf *responseFrame) (*request, error) {
+	be.mutex.Lock()
+	defer be.mutex.Unlock()
 
-	log.Debugf("Opening southbound streams for method '%s'", f.methodInfo.method)
+	isStreamingRequest, isStreamingResponse := nf.router.IsStreaming(nf.methodInfo.method)
+
 	// Get the metadata from the incoming message on the server
 	md, ok := metadata.FromIncomingContext(serverStream.Context())
 	if !ok {
-		return nil, errors.New("Could not get a server stream metadata")
+		return nil, errors.New("could not get a server stream metadata")
 	}
 
+	r := &request{
+		// Create an outgoing context that includes the incoming metadata and that will cancel if the server's context is canceled
+		ctx: metadata.AppendToOutgoingContext(metadata.NewOutgoingContext(serverStream.Context(), md.Copy()), "voltha_serial_number", strconv.FormatUint(nf.serialNo, 10)),
+
+		streams:         make(map[string]grpc.ClientStream),
+		responseErrChan: make(chan error, 1),
+
+		backend:             be,
+		serverStream:        serverStream,
+		methodInfo:          nf.methodInfo,
+		requestFrame:        nf,
+		responseFrame:       sf,
+		isStreamingRequest:  isStreamingRequest,
+		isStreamingResponse: isStreamingResponse,
+	}
+
+	log.Debugf("Opening southbound request for method '%s'", nf.methodInfo.method)
+
 	// TODO: Need to check if this is an active/active backend cluster
 	// with a serial number in the header.
-	log.Debugf("Serial number for transaction allocated: %d", f.serialNo)
+	log.Debugf("Serial number for transaction allocated: %d", nf.serialNo)
 	// If even one stream can be created then proceed. If none can be
-	// created then report an error becase both the primary and redundant
-	// connections are non-existant.
+	// created then report an error because both the primary and redundant
+	// connections are non-existent.
 	var atLeastOne = false
 	var errStr strings.Builder
-	log.Debugf("There are %d connections to open", len(be.connections))
-	for _, cn := range be.connections {
-		// Copy in the metadata
-		if cn.getState() == connectivity.Ready && cn.getConn() != nil {
-			log.Debugf("Opening southbound stream for connection '%s'", cn.name)
-			// Create an outgoing context that includes the incoming metadata
-			// and that will cancel if the server's context is canceled
-			clientCtx, clientCancel := context.WithCancel(serverStream.Context())
-			clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
-			//TODO: Same check here, only add the serial number if necessary
-			clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
-				strconv.FormatUint(f.serialNo, 10))
-			// Create the client stream
-			if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
-				cn.getConn(), f.methodInfo.all); err != nil {
-				log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
-				fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
-				rtrn.streams[cn.name] = nil
-			} else {
-				rtrn.streams[cn.name] = &stream{stream: clientStream, ctx: clientCtx,
-					cancel: clientCancel, s2cReturn: nil,
-					ok2Close:  make(chan struct{}),
-					c2sReturn: make(chan error, 1)}
-				atLeastOne = true
-			}
-		} else if cn.getConn() == nil {
-			err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name))
-			fmt.Fprint(&errStr, err.Error())
-			log.Debug(err)
+	log.Debugf("There are %d/%d streams to open", len(be.openConns), len(be.connections))
+	for cn, conn := range be.openConns {
+		log.Debugf("Opening stream for connection '%s'", cn.name)
+		if stream, err := grpc.NewClientStream(r.ctx, clientStreamDescForProxying, conn, r.methodInfo.all); err != nil {
+			log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
 		} else {
-			err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name))
-			fmt.Fprint(&errStr, err.Error())
-			log.Debug(err)
+			r.streams[cn.name] = stream
+			go r.forwardResponseStream(cn.name, stream)
+			atLeastOne = true
 		}
 	}
 	if atLeastOne {
-		rtrn.sortStreams()
-		return rtrn, nil
+		be.activeRequests[r] = struct{}{}
+		return r, nil
 	}
-	fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ", be.name)
+	fmt.Fprintf(&errStr, "{{No open connections for backend '%s' unable to send}} ", be.name)
 	log.Error(errStr.String())
 	return nil, errors.New(errStr.String())
 }
 
-func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *nbFrame, sf *sbFrame) error {
-
-	// Set up and launch each individual southbound stream
-	var beStrms *streams
-	var rtrn error = nil
-	var s2cOk = false
-	var c2sOk = false
-
-	beStrms, err := be.openSouthboundStreams(srv, serverStream, nf)
+func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *requestFrame, sf *responseFrame) error {
+	// Set up streams for each open connection
+	request, err := be.openSouthboundStreams(srv, serverStream, nf, sf)
 	if err != nil {
 		log.Errorf("openStreams failed: %v", err)
 		return err
 	}
-	// If we get here, there has to be AT LEAST ONE open stream
 
-	// *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
-	// Channels do not have to be closed, it is just a control flow mechanism, see
-	// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
-
-	log.Debug("Starting server to client forwarding")
-	s2cErrChan := beStrms.forwardServerToClient(serverStream, nf)
-
-	log.Debug("Starting client to server forwarding")
-	c2sErrChan := beStrms.forwardClientToServer(serverStream, sf)
-
-	// We don't know which side is going to stop sending first, so we need a select between the two.
-	for i := 0; i < 2; i++ {
-		select {
-		case s2cErr := <-s2cErrChan:
-			s2cOk = true
-			log.Debug("Processing s2cErr")
-			if s2cErr == io.EOF {
-				log.Debug("s2cErr reporting EOF")
-				// this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
-				// the clientStream>serverStream may continue sending though.
-				defer beStrms.closeSend()
-				if c2sOk {
-					return rtrn
-				}
-			} else {
-				log.Debugf("s2cErr reporting %v", s2cErr)
-				// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
-				// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
-				// exit with an error to the stack
-				beStrms.clientCancel()
-				return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
-			}
-		case c2sErr := <-c2sErrChan:
-			c2sOk = true
-			log.Debug("Processing c2sErr")
-			// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
-			// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
-			// will be nil.
-			serverStream.SetTrailer(beStrms.trailer())
-			// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
-			// NOTE!!! with redundant backends, it's likely that one of the backends
-			// returns a response before all the data has been sent southbound and
-			// the southbound streams are closed. Should this happen one of the
-			// backends may not get the request.
-			if c2sErr != io.EOF {
-				rtrn = c2sErr
-			}
-			log.Debug("c2sErr reporting EOF")
-			if s2cOk {
-				return rtrn
-			}
-		}
+	log.Debug("Starting request stream forwarding")
+	if s2cErr := request.forwardRequestStream(serverStream); s2cErr != nil {
+		// exit with an error to the stack
+		return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
 	}
-	return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
+	// wait for response stream to complete
+	return <-request.responseErrChan
 }
 
 func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
@@ -191,7 +153,12 @@
 
 	log.Debugf("Configuring the backend with %v", *conf)
 	// Validate the conifg and configure the backend
-	be := &backend{name: conf.Name, connections: make(map[string]*connection), openConns: 0}
+	be := &backend{
+		name:           conf.Name,
+		connections:    make(map[string]*connection),
+		openConns:      make(map[*connection]*grpc.ClientConn),
+		activeRequests: make(map[*request]struct{}),
+	}
 	if conf.Type == BackendUndefined {
 		log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
 		rtrn_err = true
@@ -248,25 +215,23 @@
 			log.Errorf("A connection must have a name for backend %s in cluster %s",
 				conf.Name, clusterName)
 		} else {
-			gc := &gConnection{conn: nil, cancel: nil, state: connectivity.Idle}
-			be.connections[cnConf.Name] = &connection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, backend: be, gConn: gc}
-			if cnConf.Addr != "" { // This connection will be specified later.
-				if _, err := url.Parse(cnConf.Addr); err != nil {
-					log.Errorf("The address for connection %s in backend %s in cluster %s is invalid: %s",
-						cnConf.Name, conf.Name, clusterName, err)
-					rtrn_err = true
-				}
-				// Validate the port number. This just validtes that it's a non 0 integer
-				if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
+			ctx, cancelFunc := context.WithCancel(context.Background())
+			be.connections[cnConf.Name] = &connection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, backend: be, ctx: ctx, close: cancelFunc}
+			if _, err := url.Parse(cnConf.Addr); err != nil {
+				log.Errorf("The address for connection %s in backend %s in cluster %s is invalid: %s",
+					cnConf.Name, conf.Name, clusterName, err)
+				rtrn_err = true
+			}
+			// Validate the port number. This just validtes that it's a non 0 integer
+			if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
+				log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
+					cnConf.Port, cnConf.Name, conf.Name, clusterName)
+				rtrn_err = true
+			} else {
+				if n <= 0 && n > 65535 {
 					log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
 						cnConf.Port, cnConf.Name, conf.Name, clusterName)
 					rtrn_err = true
-				} else {
-					if n <= 0 && n > 65535 {
-						log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
-							cnConf.Port, cnConf.Name, conf.Name, clusterName)
-						rtrn_err = true
-					}
 				}
 			}
 		}
@@ -281,20 +246,26 @@
 	return be, nil
 }
 
-func (be *backend) incConn() {
+func (be *backend) incConn(cn *connection, conn *grpc.ClientConn) {
 	be.mutex.Lock()
 	defer be.mutex.Unlock()
-	be.openConns++
+
+	be.openConns[cn] = conn
+	be.splitActiveStreamsUnsafe(cn, conn)
 }
 
-func (be *backend) decConn() {
+func (be *backend) decConn(cn *connection) {
 	be.mutex.Lock()
 	defer be.mutex.Unlock()
-	be.openConns--
-	if be.openConns < 0 {
-		log.Error("Internal error, number of open connections less than 0")
-		be.openConns = 0
-	}
+
+	delete(be.openConns, cn)
+}
+
+func (be *backend) NumOpenConnections() int {
+	be.mutex.Lock()
+	defer be.mutex.Unlock()
+
+	return len(be.openConns)
 }
 
 // Attempts to establish all the connections for a backend
@@ -303,7 +274,7 @@
 // handled after that.
 func (be *backend) connectAll() {
 	for _, cn := range be.connections {
-		cn.connect()
+		go cn.connect()
 	}
 }
 
diff --git a/afrouter/afrouter/binding-router.go b/afrouter/afrouter/binding-router.go
index 67a7a7e..dd73756 100644
--- a/afrouter/afrouter/binding-router.go
+++ b/afrouter/afrouter/binding-router.go
@@ -39,6 +39,10 @@
 	currentBackend **backend
 }
 
+func (br BindingRouter) IsStreaming(_ string) (bool, bool) {
+	panic("not implemented")
+}
+
 func (br BindingRouter) BackendCluster(s string, metaKey string) (*cluster, error) {
 	return br.beCluster, nil
 	//return nil,errors.New("Not implemented yet")
@@ -79,7 +83,7 @@
 func (br BindingRouter) Route(sel interface{}) *backend {
 	var err error
 	switch sl := sel.(type) {
-	case *nbFrame:
+	case *requestFrame:
 		if b, ok := br.bindings[sl.metaVal]; ok == true { // binding exists, just return it
 			return b
 		} else { // establish a new binding or error.
diff --git a/afrouter/afrouter/cluster.go b/afrouter/afrouter/cluster.go
index 879246b..e33db67 100644
--- a/afrouter/afrouter/cluster.go
+++ b/afrouter/afrouter/cluster.go
@@ -94,7 +94,7 @@
 			log.Debug("Previous backend is nil")
 			be = c.backends[0]
 			in = be
-			if be.openConns != 0 {
+			if be.NumOpenConnections() != 0 {
 				return be, nil
 			}
 		}
@@ -106,7 +106,7 @@
 				cur = 0
 			}
 			log.Debugf("Next backend is %d:%s", cur, c.backends[cur].name)
-			if c.backends[cur].openConns > 0 {
+			if c.backends[cur].NumOpenConnections() > 0 {
 				return c.backends[cur], nil
 			}
 			if c.backends[cur] == in {
@@ -133,8 +133,8 @@
 	// now.
 
 	// Get the backend to use.
-	// Allocate the nbFrame here since it holds the "context" of this communication
-	nf := &nbFrame{router: r, methodInfo: methodInfo, serialNo: c.allocateSerialNumber(), metaKey: mk, metaVal: mv}
+	// Allocate the requestFrame here since it holds the "context" of this communication
+	nf := &requestFrame{router: r, methodInfo: methodInfo, serialNo: c.allocateSerialNumber(), metaKey: mk, metaVal: mv}
 	log.Debugf("Nb frame allocate with method %s", nf.methodInfo.method)
 
 	if be, err := c.assignBackend(serverStream, nf); err != nil {
@@ -143,14 +143,14 @@
 		return err
 	} else {
 		log.Debugf("Backend '%s' selected", be.name)
-		// Allocate a sbFrame here because it might be needed for return value intercept
-		sf := &sbFrame{router: r, backend: be, method: nf.methodInfo.method, metaKey: mk, metaVal: mv}
+		// Allocate a responseFrame here because it might be needed for return value intercept
+		sf := &responseFrame{router: r, backend: be, method: nf.methodInfo.method, metaKey: mk, metaVal: mv}
 		log.Debugf("Sb frame allocated with router %s", r.Name())
 		return be.handler(srv, serverStream, nf, sf)
 	}
 }
 
-func (c *cluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) {
+func (c *cluster) assignBackend(src grpc.ServerStream, f *requestFrame) (*backend, error) {
 	// Receive the first message from the server. This calls the assigned codec in which
 	// Unmarshal gets executed. That will use the assigned router to select a backend
 	// and add it to the frame
@@ -163,7 +163,7 @@
 		err := fmt.Errorf("Unable to route method '%s'", f.methodInfo.method)
 		log.Error(err)
 		return nil, err
-	} else if f.backend.openConns == 0 {
+	} else if len(f.backend.openConns) == 0 {
 		err := fmt.Errorf("No open connections on backend '%s'", f.backend.name)
 		log.Error(err)
 		return f.backend, err
diff --git a/afrouter/afrouter/codec.go b/afrouter/afrouter/codec.go
index 675320a..7ea6ef2 100644
--- a/afrouter/afrouter/codec.go
+++ b/afrouter/afrouter/codec.go
@@ -21,7 +21,6 @@
 	"github.com/golang/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"google.golang.org/grpc"
-	"sync"
 )
 
 func Codec() grpc.Codec {
@@ -36,19 +35,18 @@
 	parentCodec grpc.Codec
 }
 
-// sbFrame is a frame being "returned" to whomever established the connection
-type sbFrame struct {
+// responseFrame is a frame being "returned" to whomever established the connection
+type responseFrame struct {
 	payload []byte
 	router  Router
 	method  string
 	backend *backend
-	mutex   sync.Mutex
 	metaKey string
 	metaVal string
 }
 
-// nbFrame is a frame coming in from whomever established the connection
-type nbFrame struct {
+// requestFrame is a frame coming in from whomever established the connection
+type requestFrame struct {
 	payload    []byte
 	router     Router
 	backend    *backend
@@ -61,9 +59,9 @@
 
 func (cdc *transparentRoutingCodec) Marshal(v interface{}) ([]byte, error) {
 	switch t := v.(type) {
-	case *sbFrame:
+	case *responseFrame:
 		return t.payload, nil
-	case *nbFrame:
+	case *requestFrame:
 		return t.payload, nil
 	default:
 		return cdc.parentCodec.Marshal(v)
@@ -72,17 +70,21 @@
 
 func (cdc *transparentRoutingCodec) Unmarshal(data []byte, v interface{}) error {
 	switch t := v.(type) {
-	case *sbFrame:
+	case *responseFrame:
 		t.payload = data
 		// This is where the affinity is established on a northbound response
 		t.router.ReplyHandler(v)
 		return nil
-	case *nbFrame:
+	case *requestFrame:
 		t.payload = data
 		// This is were the afinity value is pulled from the payload
 		// and the backend selected.
 		t.backend = t.router.Route(v)
-		log.Debugf("Routing returned %v for method %s", t.backend, t.methodInfo.method)
+		name := "<nil>"
+		if t.backend != nil {
+			name = t.backend.name
+		}
+		log.Debugf("Routing returned %s for method %s", name, t.methodInfo.method)
 		return nil
 	default:
 		return cdc.parentCodec.Unmarshal(data, v)
diff --git a/afrouter/afrouter/config.go b/afrouter/afrouter/config.go
index 1f0be7b..6008199 100644
--- a/afrouter/afrouter/config.go
+++ b/afrouter/afrouter/config.go
@@ -22,6 +22,7 @@
 	"errors"
 	"flag"
 	"fmt"
+	"github.com/golang/protobuf/protoc-gen-go/descriptor"
 	"github.com/opencord/voltha-go/common/log"
 	"io/ioutil"
 	"os"
@@ -62,16 +63,17 @@
 }
 
 type RouterConfig struct {
-	Name         string        `json:"name"`
-	ProtoService string        `json:"service"`
-	ProtoPackage string        `json:"package"`
-	Routes       []RouteConfig `json:"routes"`
+	Name            string        `json:"name"`
+	ProtoService    string        `json:"service"`
+	ProtoPackage    string        `json:"package"`
+	ProtoFile       string        `json:"proto_descriptor"`
+	Routes          []RouteConfig `json:"routes"`
+	protoDescriptor descriptor.FileDescriptorSet
 }
 
 type RouteConfig struct {
 	Name             string           `json:"name"`
 	Type             routeType        `json:"type"`
-	ProtoFile        string           `json:"proto_descriptor"`
 	Association      associationType  `json:"association"`
 	RouteField       string           `json:"routing_field"`
 	Methods          []string         `json:"methods"` // The GRPC methods to route using the route field
diff --git a/afrouter/afrouter/connection.go b/afrouter/afrouter/connection.go
index fab1052..dcdb8d6 100644
--- a/afrouter/afrouter/connection.go
+++ b/afrouter/afrouter/connection.go
@@ -21,226 +21,82 @@
 	"github.com/opencord/voltha-go/common/log"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/connectivity"
-	"sync"
 	"time"
 )
 
 // connection represents a connection to a single backend
 type connection struct {
-	mutex   sync.Mutex
+	backend *backend
 	name    string
 	addr    string
 	port    string
-	gConn   *gConnection
-	backend *backend
-}
-
-// This structure should never be referred to
-// by any routine outside of *connection
-// routines.
-type gConnection struct {
-	mutex  sync.Mutex
-	state  connectivity.State
-	conn   *grpc.ClientConn
-	cancel context.CancelFunc
+	ctx     context.Context
+	close   context.CancelFunc
 }
 
 func (cn *connection) connect() {
-	if cn.addr != "" && cn.getConn() == nil {
-		log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
+	for {
+		log.Infof("Connecting to %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
 		// Dial doesn't block, it just returns and continues connecting in the background.
 		// Check back later to confirm and increase the connection count.
-		ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
-		cn.setCancel(cnclFnc)
-		if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
-			log.Errorf("Dialng connection %v:%v", cn, err)
-			cn.waitAndTryAgain(ctx)
-		} else {
-			cn.setConn(conn)
-			log.Debugf("Starting the connection monitor for '%s'", cn.name)
-			cn.monitor(ctx)
-		}
-	} else if cn.addr == "" {
-		log.Infof("No address supplied for connection '%s', not connecting for now", cn.name)
-	} else {
-		log.Debugf("Connection '%s' is already connected, ignoring", cn.name)
-	}
-}
 
-func (cn *connection) waitAndTryAgain(ctx context.Context) {
-	go func(ctx context.Context) {
-		ctxTm, cnclTm := context.WithTimeout(context.Background(), 10*time.Second)
+		var err error
+		conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure(), grpc.WithBackoffMaxDelay(time.Second*15))
+		if err != nil {
+			log.Fatalf("Dialing connection %v:%v", cn, err)
+		}
+
+		log.Debugf("Starting the connection monitor for '%s'", cn.name)
+		cn.monitor(conn)
+		conn.Close()
+
 		select {
-		case <-ctxTm.Done():
-			cnclTm()
-			log.Debugf("Trying to connect '%s'", cn.name)
-			// Connect creates a new context so cancel this one.
-			cn.cancel()
-			cn.connect()
+		case <-cn.ctx.Done():
 			return
-		case <-ctx.Done():
-			cnclTm()
-			return
+		default:
 		}
-	}(ctx)
-}
-
-func (cn *connection) cancel() {
-	cn.mutex.Lock()
-	defer cn.mutex.Unlock()
-	log.Debugf("Canceling connection %s", cn.name)
-	if cn.gConn != nil {
-		if cn.gConn.cancel != nil {
-			cn.gConn.cancel()
-		} else {
-			log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
-		}
-	} else {
-		log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name)
 	}
 }
 
-func (cn *connection) setCancel(cancel context.CancelFunc) {
-	cn.mutex.Lock()
-	defer cn.mutex.Unlock()
-	if cn.gConn != nil {
-		cn.gConn.cancel = cancel
-	} else {
-		log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name)
-	}
-}
-
-func (cn *connection) setConn(conn *grpc.ClientConn) {
-	cn.mutex.Lock()
-	defer cn.mutex.Unlock()
-	if cn.gConn != nil {
-		cn.gConn.conn = conn
-	} else {
-		log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name)
-	}
-}
-
-func (cn *connection) getConn() *grpc.ClientConn {
-	cn.mutex.Lock()
-	defer cn.mutex.Unlock()
-	if cn.gConn != nil {
-		return cn.gConn.conn
-	}
-	return nil
-}
-
-func (cn *connection) close() {
-	cn.mutex.Lock()
-	defer cn.mutex.Unlock()
-	log.Debugf("Closing connection %s", cn.name)
-	if cn.gConn != nil && cn.gConn.conn != nil {
-		if cn.gConn.conn.GetState() == connectivity.Ready {
-			cn.backend.decConn() // Decrease the connection reference
-		}
-		if cn.gConn.cancel != nil {
-			cn.gConn.cancel() // Cancel the context first to force monitor functions to exit
-		} else {
-			log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
-		}
-		cn.gConn.conn.Close() // Close the connection
-		// Now replace the gConn object with a new one as this one just
-		// fades away as references to it are released after the close
-		// finishes in the background.
-		cn.gConn = &gConnection{conn: nil, cancel: nil, state: connectivity.TransientFailure}
-	} else {
-		log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
-	}
-
-}
-
-func (cn *connection) setState(st connectivity.State) {
-	cn.mutex.Lock()
-	defer cn.mutex.Unlock()
-	if cn.gConn != nil {
-		cn.gConn.state = st
-	} else {
-		log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name)
-	}
-}
-
-func (cn *connection) getState() connectivity.State {
-	cn.mutex.Lock()
-	defer cn.mutex.Unlock()
-	if cn.gConn != nil {
-		if cn.gConn.conn != nil {
-			return cn.gConn.conn.GetState()
-		} else {
-			log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name)
-		}
-	} else {
-		log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name)
-	}
-	// For lack of a better state to use. The logs will help determine what happened here.
-	return connectivity.TransientFailure
-}
-
-func (cn *connection) monitor(ctx context.Context) {
+func (cn *connection) monitor(conn *grpc.ClientConn) {
 	be := cn.backend
 	log.Debugf("Setting up monitoring for backend %s", be.name)
-	go func(ctx context.Context) {
-		var delay time.Duration = 100 //ms
-		for {
-			//log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, be.name, cn.conn)
-			if cn.getState() == connectivity.Ready {
-				log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, be.name)
-				cn.setState(connectivity.Ready)
-				be.incConn()
-				if cn.getConn() != nil && !cn.getConn().WaitForStateChange(ctx, connectivity.Ready) {
-					// The context was canceled. This is done by the close function
-					// so just exit the routine
-					log.Debugf("Contxt canceled for connection '%s' on backend '%s'", cn.name, be.name)
-					return
-				}
-				if cs := cn.getConn(); cs != nil {
-					switch cs := cn.getState(); cs {
-					case connectivity.TransientFailure:
-						cn.setState(cs)
-						be.decConn()
-						log.Infof("Transient failure for  connection '%s' on backend '%s'", cn.name, be.name)
-						delay = 100
-					case connectivity.Shutdown:
-						//The connection was closed. The assumption here is that the closer
-						// will manage the connection count and setting the conn to nil.
-						// Exit the routine
-						log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, be.name)
-						return
-					case connectivity.Idle:
-						// This can only happen if the server sends a GoAway. This can
-						// only happen if the server has modified MaxConnectionIdle from
-						// its default of infinity. The only solution here is to close the
-						// connection and keepTrying()?
-						//TODO: Read the grpc source code to see if there's a different approach
-						log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, be.name)
-						cn.close()
-						cn.connect()
-						return
-					}
-				} else { // A nil means something went horribly wrong, error and exit.
-					log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s", cn.name)
-					return
-				}
-			} else {
-				log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, be.name)
-				ctxTm, cnclTm := context.WithTimeout(context.Background(), delay*time.Millisecond)
-				if delay < 30000 {
-					delay += delay
-				}
-				select {
-				case <-ctxTm.Done():
-					cnclTm() // Doubt this is required but it's harmless.
-					// Do nothing but let the loop continue
-				case <-ctx.Done():
-					cnclTm()
-					// Context was closed, close and exit routine
-					//cn.close() NO! let the close be managed externally!
-					return
-				}
+	state := connectivity.Idle
+monitorLoop:
+	for {
+		if !conn.WaitForStateChange(cn.ctx, state) {
+			log.Debugf("Context canceled for connection '%s' on backend '%s'", cn.name, be.name)
+			break monitorLoop // connection closed
+		}
+
+		if newState := conn.GetState(); newState != state {
+			previousState := state
+			state = newState
+
+			if previousState == connectivity.Ready {
+				be.decConn(cn)
+				log.Infof("Lost connection '%s' on backend '%s'", cn.name, be.name)
+			}
+
+			switch state {
+			case connectivity.Ready:
+				log.Infof("Connection '%s' on backend '%s' becomes ready", cn.name, be.name)
+				be.incConn(cn, conn)
+			case connectivity.TransientFailure, connectivity.Connecting:
+				// we don't log these, to avoid spam
+			case connectivity.Shutdown:
+				// the connection was closed
+				log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, be.name)
+				break monitorLoop
+			case connectivity.Idle:
+				// This can only happen if the server sends a GoAway. This can
+				// only happen if the server has modified MaxConnectionIdle from
+				// its default of infinity. The only solution here is to close the
+				// connection and keepTrying()?
+				//TODO: Read the grpc source code to see if there's a different approach
+				log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, be.name)
+				break monitorLoop
 			}
 		}
-	}(ctx)
+	}
 }
diff --git a/afrouter/afrouter/method-router.go b/afrouter/afrouter/method-router.go
index 2c1ca4f..fd3b974 100644
--- a/afrouter/afrouter/method-router.go
+++ b/afrouter/afrouter/method-router.go
@@ -19,23 +19,66 @@
 import (
 	"errors"
 	"fmt"
+	"github.com/golang/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/metadata"
+	"io/ioutil"
 )
 
 const NoMeta = "nometa"
 
 type MethodRouter struct {
-	name         string
-	service      string
-	methodRouter map[string]map[string]Router // map of [metadata][method]
+	name            string
+	service         string
+	methodRouter    map[string]map[string]Router // map of [metadata][method]
+	methodStreaming map[string]streamingDirections
+}
+
+type streamingDirections struct {
+	request  bool
+	response bool
 }
 
 func newMethodRouter(config *RouterConfig) (Router, error) {
-	mr := MethodRouter{name: config.Name, service: config.ProtoService, methodRouter: make(map[string]map[string]Router)}
-	mr.methodRouter[NoMeta] = make(map[string]Router) // For routes not needing metadata (all expcept binding at this time)
+	// Load the protobuf descriptor file
+	fb, err := ioutil.ReadFile(config.ProtoFile)
+	if err != nil {
+		log.Errorf("Could not open proto file '%s'", config.ProtoFile)
+		return nil, err
+	}
+	if err := proto.Unmarshal(fb, &config.protoDescriptor); err != nil {
+		log.Errorf("Could not unmarshal %s, %v", "proto.pb", err)
+		return nil, err
+	}
+
+	mr := MethodRouter{
+		name:    config.Name,
+		service: config.ProtoService,
+		methodRouter: map[string]map[string]Router{
+			NoMeta: make(map[string]Router), // For routes not needing metadata (all except binding at this time)
+		},
+		methodStreaming: make(map[string]streamingDirections),
+	}
 	log.Debugf("Processing MethodRouter config %v", *config)
+
+	for _, file := range config.protoDescriptor.File {
+		if *file.Package == config.ProtoPackage {
+			for _, service := range file.Service {
+				if *service.Name == config.ProtoService {
+					for _, method := range service.Method {
+						if clientStreaming, serverStreaming := method.ClientStreaming != nil && *method.ClientStreaming, method.ServerStreaming != nil && *method.ServerStreaming; clientStreaming || serverStreaming {
+							mr.methodStreaming[*method.Name] = streamingDirections{
+								request:  clientStreaming,
+								response: serverStreaming,
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+
 	if len(config.Routes) == 0 {
 		return nil, errors.New(fmt.Sprintf("Router %s must have at least one route", config.Name))
 	}
@@ -121,13 +164,13 @@
 
 func (mr MethodRouter) ReplyHandler(sel interface{}) error {
 	switch sl := sel.(type) {
-	case *sbFrame:
+	case *responseFrame:
 		if r, ok := mr.methodRouter[NoMeta][sl.method]; ok {
 			return r.ReplyHandler(sel)
 		}
 		// TODO: this case should also be an error
 	default: //TODO: This should really be a big error
-		// A reply handler should only be called on the sbFrame
+		// A reply handler should only be called on the responseFrame
 		return nil
 	}
 	return nil
@@ -135,7 +178,7 @@
 
 func (mr MethodRouter) Route(sel interface{}) *backend {
 	switch sl := sel.(type) {
-	case *nbFrame:
+	case *requestFrame:
 		if r, ok := mr.methodRouter[sl.metaKey][sl.methodInfo.method]; ok {
 			return r.Route(sel)
 		}
@@ -146,6 +189,11 @@
 	}
 }
 
+func (mr MethodRouter) IsStreaming(method string) (bool, bool) {
+	streamingDirections := mr.methodStreaming[method]
+	return streamingDirections.request, streamingDirections.response
+}
+
 func (mr MethodRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
 	if r, ok := mr.methodRouter[metaKey][mthd]; ok {
 		return r.BackendCluster(mthd, metaKey)
diff --git a/afrouter/afrouter/request.go b/afrouter/afrouter/request.go
new file mode 100644
index 0000000..0af20e3
--- /dev/null
+++ b/afrouter/afrouter/request.go
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+	"context"
+	"encoding/hex"
+	"errors"
+	"github.com/opencord/voltha-go/common/log"
+	"google.golang.org/grpc"
+	"io"
+	"sync"
+)
+
+type request struct {
+	mutex                    sync.Mutex
+	activeResponseStreamOnce sync.Once
+	setResponseHeaderOnce    sync.Once
+	responseStreamMutex      sync.Mutex
+
+	streams map[string]grpc.ClientStream
+
+	requestFrameBacklog [][]byte
+	responseErrChan     chan error
+	sendClosed          bool
+
+	backend             *backend
+	ctx                 context.Context
+	serverStream        grpc.ServerStream
+	methodInfo          methodDetails
+	requestFrame        *requestFrame
+	responseFrame       *responseFrame
+	isStreamingRequest  bool
+	isStreamingResponse bool
+}
+
+// catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
+func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
+	r.streams[connName] = stream
+
+	// prime new streams with any traffic they might have missed (non-streaming requests only)
+	frame := *r.requestFrame // local copy of frame
+	for _, payload := range r.requestFrameBacklog {
+		frame.payload = payload
+		if err := stream.SendMsg(&frame); err != nil {
+			log.Debugf("Error on SendMsg: %s", err.Error())
+			break
+		}
+	}
+	if r.sendClosed {
+		stream.CloseSend()
+	}
+
+	r.mutex.Unlock()
+
+	r.forwardResponseStream(connName, stream)
+}
+
+// forwardResponseStream forwards the response stream
+func (r *request) forwardResponseStream(connName string, stream grpc.ClientStream) {
+	var queuedFrames [][]byte
+	frame := *r.responseFrame
+	var err error
+	activeStream := false
+	for {
+		err = stream.RecvMsg(&frame)
+		// the first thread to reach this point (first to receive a response frame) will become the active stream
+		r.activeResponseStreamOnce.Do(func() { activeStream = true })
+		if err != nil {
+			// this can be io.EOF which is the success case
+			break
+		}
+
+		if r.isStreamingResponse {
+			// streaming response - send immediately
+			if err = r.sendResponseFrame(stream, frame); err != nil {
+				break
+			}
+		} else { // !r.isStreamingResponse
+
+			if r.isStreamingRequest { // && !r.isStreamingResponse
+				// queue the frame (only send response when the last stream closes)
+				queuedFrames = append(queuedFrames, frame.payload)
+			} else { // !r.isStreamingRequest && !r.isStreamingResponse
+
+				// only the active stream will respond
+				if activeStream { // && !r.isStreamingRequest && !r.isStreamingResponse
+					// send the response immediately
+					if err = r.sendResponseFrame(stream, frame); err != nil {
+						break
+					}
+				} else { // !activeStream && !r.isStreamingRequest && !r.isStreamingResponse
+					// just read & discard until the stream dies
+				}
+			}
+		}
+	}
+
+	log.Debugf("Closing stream to %s", connName)
+
+	// io.EOF is the success case
+	if err == io.EOF {
+		err = nil
+	}
+
+	// this double-lock sets off alarm bells in my head
+	r.backend.mutex.Lock()
+	r.mutex.Lock()
+	delete(r.streams, connName)
+	streamsLeft := len(r.streams)
+
+	// if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
+	if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
+		// request is complete, cleanup
+		delete(r.backend.activeRequests, r)
+		r.mutex.Unlock()
+		r.backend.mutex.Unlock()
+
+		// send any queued frames we have (streaming request & !streaming response only, but no harm trying in other cases)
+		for _, payload := range queuedFrames {
+			if err != nil {
+				// if there's been an error, don't try to send anymore
+				break
+			}
+			frame.payload = payload
+			err = r.sendResponseFrame(stream, frame)
+		}
+
+		// We may have received Trailers as part of the call.
+		r.serverStream.SetTrailer(stream.Trailer())
+
+		// response stream complete
+		r.responseErrChan <- err
+	} else {
+		r.mutex.Unlock()
+		r.backend.mutex.Unlock()
+	}
+}
+
+func (r *request) sendResponseFrame(stream grpc.ClientStream, f responseFrame) error {
+	r.responseStreamMutex.Lock()
+	defer r.responseStreamMutex.Unlock()
+
+	// the header should only be set once, even if multiple streams can respond.
+	setHeader := false
+	r.setResponseHeaderOnce.Do(func() { setHeader = true })
+	if setHeader {
+		// This is a bit of a hack, but client to server headers are only readable after first client msg is
+		// received but must be written to server stream before the first msg is flushed.
+		// This is the only place to do it nicely.
+		md, err := stream.Header()
+		if err != nil {
+			return err
+		}
+		// Update the metadata for the response.
+		if f.metaKey != NoMeta {
+			if f.metaVal == "" {
+				// We could also always just do this
+				md.Set(f.metaKey, f.backend.name)
+			} else {
+				md.Set(f.metaKey, f.metaVal)
+			}
+		}
+		if err := r.serverStream.SendHeader(md); err != nil {
+			return err
+		}
+	}
+
+	log.Debugf("Response frame %s", hex.EncodeToString(f.payload))
+
+	return r.serverStream.SendMsg(&f)
+}
+
+func (r *request) sendAll(frame *requestFrame) error {
+	r.mutex.Lock()
+	if !r.isStreamingRequest {
+		// save frames of non-streaming requests, so we can catchup new streams
+		r.requestFrameBacklog = append(r.requestFrameBacklog, frame.payload)
+	}
+
+	// send to all existing streams
+	streams := make(map[string]grpc.ClientStream, len(r.streams))
+	for n, s := range r.streams {
+		streams[n] = s
+	}
+	r.mutex.Unlock()
+
+	var rtrn error
+	atLeastOne := false
+	atLeastOneSuccess := false
+	for _, stream := range streams {
+		if err := stream.SendMsg(frame); err != nil {
+			log.Debugf("Error on SendMsg: %s", err.Error())
+			rtrn = err
+		} else {
+			atLeastOneSuccess = true
+		}
+		atLeastOne = true
+	}
+	// If one of the streams succeeded, declare success
+	// if none did pick an error and return it.
+	if atLeastOne {
+		if atLeastOneSuccess {
+			return nil
+		} else {
+			return rtrn
+		}
+	} else {
+		err := errors.New("unable to send, all streams have closed")
+		log.Error(err)
+		return err
+	}
+}
+
+func (r *request) forwardRequestStream(src grpc.ServerStream) error {
+	// The frame buffer already has the results of a first
+	// RecvMsg in it so the first thing to do is to
+	// send it to the list of client streams and only
+	// then read some more.
+	frame := *r.requestFrame // local copy of frame
+	var rtrn error
+	for {
+		// Send the message to each of the backend streams
+		if err := r.sendAll(&frame); err != nil {
+			log.Debugf("SendAll failed %s", err.Error())
+			rtrn = err
+			break
+		}
+		log.Debugf("Request frame %s", hex.EncodeToString(frame.payload))
+		if err := src.RecvMsg(&frame); err != nil {
+			rtrn = err // this can be io.EOF which is happy case
+			break
+		}
+	}
+
+	r.mutex.Lock()
+	log.Debug("Closing southbound streams")
+	r.sendClosed = true
+	for _, stream := range r.streams {
+		stream.CloseSend()
+	}
+	r.mutex.Unlock()
+
+	if rtrn != io.EOF {
+		log.Debugf("s2cErr reporting %v", rtrn)
+		return rtrn
+	}
+	log.Debug("s2cErr reporting EOF")
+	// this is the successful case where the sender has encountered io.EOF, and won't be sending anymore.
+	// the clientStream>serverStream may continue sending though.
+	return nil
+}
diff --git a/afrouter/afrouter/round-robin-router.go b/afrouter/afrouter/round-robin-router.go
index 70f164a..60ff457 100644
--- a/afrouter/afrouter/round-robin-router.go
+++ b/afrouter/afrouter/round-robin-router.go
@@ -80,6 +80,10 @@
 	return "", "", nil
 }
 
+func (rr RoundRobinRouter) IsStreaming(_ string) (bool, bool) {
+	panic("not implemented")
+}
+
 func (rr RoundRobinRouter) BackendCluster(s string, mk string) (*cluster, error) {
 	return rr.cluster, nil
 }
@@ -91,7 +95,7 @@
 func (rr RoundRobinRouter) Route(sel interface{}) *backend {
 	var err error
 	switch sl := sel.(type) {
-	case *nbFrame:
+	case *requestFrame:
 		// Since this is a round robin router just get the next backend
 		if *rr.currentBackend, err = rr.cluster.nextBackend(*rr.currentBackend, BackendSequenceRoundRobin); err == nil {
 			return *rr.currentBackend
diff --git a/afrouter/afrouter/router.go b/afrouter/afrouter/router.go
index 323ffb2..1bd2d6e 100644
--- a/afrouter/afrouter/router.go
+++ b/afrouter/afrouter/router.go
@@ -29,6 +29,7 @@
 	Name() string
 	Route(interface{}) *backend
 	Service() string
+	IsStreaming(string) (bool, bool)
 	BackendCluster(string, string) (*cluster, error)
 	FindBackendCluster(string) *cluster
 	ReplyHandler(interface{}) error
diff --git a/afrouter/afrouter/server.go b/afrouter/afrouter/server.go
index dff1519..82269d2 100644
--- a/afrouter/afrouter/server.go
+++ b/afrouter/afrouter/server.go
@@ -23,6 +23,7 @@
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"net"
+	"net/url"
 	"strconv"
 )
 
@@ -58,7 +59,7 @@
 		rtrn_err = true
 	}
 	// Validate the ip address if one is provided
-	if ip := net.ParseIP(config.Addr); config.Addr != "" && ip == nil {
+	if _, err := url.Parse(config.Addr); err != nil {
 		log.Errorf("Invalid address '%s' provided for server '%s'", config.Addr, config.Name)
 		rtrn_err = true
 	}
@@ -144,7 +145,7 @@
 	if !ok {
 		return grpc.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
 	}
-	log.Debugf("Processing grpc request %s on server %s", fullMethodName, s.name)
+	log.Debugf("\n\nProcessing grpc request %s on server %s", fullMethodName, s.name)
 	methodInfo := newMethodDetails(fullMethodName)
 	r, ok := s.getRouter(methodInfo.pkg, methodInfo.service)
 	//fn, ok := s.routers[methodInfo.pkg][methodInfo.service]
@@ -156,7 +157,7 @@
 		log.Error(err)
 		return err
 	}
-	log.Debugf("Selected router %s\n", r.Name())
+	log.Debugf("Selected router %s", r.Name())
 
 	mk, mv, err := r.GetMetaKeyVal(serverStream)
 	if err != nil {
diff --git a/afrouter/afrouter/streams.go b/afrouter/afrouter/streams.go
deleted file mode 100644
index bb7faea..0000000
--- a/afrouter/afrouter/streams.go
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package afrouter
-
-import (
-	"context"
-	"encoding/hex"
-	"errors"
-	"github.com/opencord/voltha-go/common/log"
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/metadata"
-	"sort"
-	"sync"
-)
-
-type streams struct {
-	mutex         sync.Mutex
-	activeStream  *stream
-	streams       map[string]*stream
-	sortedStreams []*stream
-}
-
-type stream struct {
-	stream    grpc.ClientStream
-	ctx       context.Context
-	cancel    context.CancelFunc
-	ok2Close  chan struct{}
-	c2sReturn chan error
-	s2cReturn error
-}
-
-func (s *streams) clientCancel() {
-	for _, strm := range s.streams {
-		if strm != nil {
-			strm.cancel()
-		}
-	}
-}
-
-func (s *streams) closeSend() {
-	for _, strm := range s.streams {
-		if strm != nil {
-			<-strm.ok2Close
-			log.Debug("Closing southbound stream")
-			strm.stream.CloseSend()
-		}
-	}
-}
-
-func (s *streams) trailer() metadata.MD {
-	return s.activeStream.stream.Trailer()
-}
-
-func (s *streams) getActive() *stream {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	return s.activeStream
-}
-
-func (s *streams) setThenGetActive(strm *stream) *stream {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	if s.activeStream == nil {
-		s.activeStream = strm
-	}
-	return s.activeStream
-}
-
-func (s *streams) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
-	fc2s := func(srcS *stream) {
-		for i := 0; ; i++ {
-			if err := srcS.stream.RecvMsg(f); err != nil {
-				if s.setThenGetActive(srcS) == srcS {
-					srcS.c2sReturn <- err // this can be io.EOF which is the success case
-				} else {
-					srcS.c2sReturn <- nil // Inactive responder
-				}
-				close(srcS.ok2Close)
-				break
-			}
-			if s.setThenGetActive(srcS) != srcS {
-				srcS.c2sReturn <- nil
-				continue
-			}
-			if i == 0 {
-				// This is a bit of a hack, but client to server headers are only readable after first client msg is
-				// received but must be written to server stream before the first msg is flushed.
-				// This is the only place to do it nicely.
-				md, err := srcS.stream.Header()
-				if err != nil {
-					srcS.c2sReturn <- err
-					break
-				}
-				// Update the metadata for the response.
-				if f.metaKey != NoMeta {
-					if f.metaVal == "" {
-						// We could also alsways just do this
-						md.Set(f.metaKey, f.backend.name)
-					} else {
-						md.Set(f.metaKey, f.metaVal)
-					}
-				}
-				if err := dst.SendHeader(md); err != nil {
-					srcS.c2sReturn <- err
-					break
-				}
-			}
-			log.Debugf("Northbound frame %s", hex.EncodeToString(f.payload))
-			if err := dst.SendMsg(f); err != nil {
-				srcS.c2sReturn <- err
-				break
-			}
-		}
-	}
-
-	// There should be AT LEAST one open stream at this point
-	// if there isn't its a grave error in the code and it will
-	// cause this thread to block here so check for it and
-	// don't let the lock up happen but report the error
-	ret := make(chan error, 1)
-	agg := make(chan *stream)
-	atLeastOne := false
-	for _, strm := range s.streams {
-		if strm != nil {
-			go fc2s(strm)
-			go func(s *stream) { // Wait on result and aggregate
-				r := <-s.c2sReturn // got the return code
-				if r == nil {
-					return // We're the redundant stream, just die
-				}
-				s.c2sReturn <- r // put it back to pass it along
-				agg <- s         // send the stream to the aggregator
-			}(strm)
-			atLeastOne = true
-		}
-	}
-	if atLeastOne == true {
-		go func() { // Wait on aggregated result
-			s := <-agg
-			ret <- <-s.c2sReturn
-		}()
-	} else {
-		err := errors.New("There are no open streams. Unable to forward message.")
-		log.Error(err)
-		ret <- err
-	}
-	return ret
-}
-
-func (s *streams) sendAll(f *nbFrame) error {
-	var rtrn error
-
-	atLeastOne := false
-	for _, strm := range s.sortedStreams {
-		if strm != nil {
-			if err := strm.stream.SendMsg(f); err != nil {
-				log.Debugf("Error on SendMsg: %s", err.Error())
-				strm.s2cReturn = err
-			}
-			atLeastOne = true
-		} else {
-			log.Debugf("Nil stream")
-		}
-	}
-	// If one of the streams succeeded, declare success
-	// if none did pick an error and return it.
-	if atLeastOne == true {
-		for _, strm := range s.sortedStreams {
-			if strm != nil {
-				rtrn = strm.s2cReturn
-				if rtrn == nil {
-					return rtrn
-				}
-			}
-		}
-		return rtrn
-	} else {
-		rtrn = errors.New("There are no open streams, this should never happen")
-		log.Error(rtrn)
-	}
-	return rtrn
-}
-
-func (s *streams) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
-	ret := make(chan error, 1)
-	go func() {
-		// The frame buffer already has the results of a first
-		// RecvMsg in it so the first thing to do is to
-		// send it to the list of client streams and only
-		// then read some more.
-		for i := 0; ; i++ {
-			// Send the message to each of the backend streams
-			if err := s.sendAll(f); err != nil {
-				ret <- err
-				log.Debugf("SendAll failed %s", err.Error())
-				break
-			}
-			log.Debugf("Southbound frame %s", hex.EncodeToString(f.payload))
-			if err := src.RecvMsg(f); err != nil {
-				ret <- err // this can be io.EOF which is happy case
-				break
-			}
-		}
-	}()
-	return ret
-}
-
-func (s *streams) sortStreams() {
-	var tmpKeys []string
-	for k := range s.streams {
-		tmpKeys = append(tmpKeys, k)
-	}
-	sort.Strings(tmpKeys)
-	for _, v := range tmpKeys {
-		s.sortedStreams = append(s.sortedStreams, s.streams[v])
-	}
-}
diff --git a/afrouter/arouter.json b/afrouter/arouter.json
index 7739d65..5036402 100644
--- a/afrouter/arouter.json
+++ b/afrouter/arouter.json
@@ -3,7 +3,7 @@
     {
       "name": "grpc_command",
       "port": 55555,
-      "address": "",
+      "address": "localhost",
       "type": "grpc",
       "routers": [
         {
@@ -20,10 +20,10 @@
       "name": "vcore",
       "package": "voltha",
       "service": "VolthaService",
+      "proto_descriptor": "vendor/github.com/opencord/voltha-protos/go/voltha.pb",
       "routes": [
         {
           "name": "dev_manager",
-          "proto_descriptor": "voltha.pb",
           "type": "rpc_affinity_message",
           "association": "round_robin",
           "routing_field": "id",
@@ -273,7 +273,7 @@
   ],
   "api": {
     "_comment": "If this isn't defined then no api is available for dynamic configuration and queries",
-    "address": "",
+    "address": "localhost",
     "port": 55554
   }
 }
