Reworked connection to use a single thread for state management.

Also disabled the SetConnection API call.

Stream cleanup.

Removed Unnescessary threads, there is now one thread per connection (handling response stream forwarding), and the existing thread is used to forward the request stream.
Renamed 'streams' to 'request'.
Renamed 'nbFrame' to 'requestFrame'.
Renamed 'sbFrame' to 'responseFrame'.

Changed handling of streaming requests.

Incoming & Outgoing streams are split when a connection becomes ready.
Added playback of non-streaming requests/responses for newly opened streams.

Late stream catchup fix & streaming call detection.

Fixed an issue where old streams were not being caught up with what they missed.
Streaming requests & responses are now detected based on the proto definitions.
Changed where the proto file is specified in the afrouter config (see afrouter/arouter.json for an example).

Fixed mutex copy.

Also tweaked some log statements.

Fixed field tag lint error.

Change-Id: I6e14039c27519d8d2103065258ff4302bc881235
diff --git a/afrouter/afrouter/affinity-router.go b/afrouter/afrouter/affinity-router.go
index 2390846..583d0a7 100644
--- a/afrouter/afrouter/affinity-router.go
+++ b/afrouter/afrouter/affinity-router.go
@@ -20,10 +20,8 @@
 	"errors"
 	"fmt"
 	"github.com/golang/protobuf/proto"
-	pb "github.com/golang/protobuf/protoc-gen-go/descriptor"
 	"github.com/opencord/voltha-go/common/log"
 	"google.golang.org/grpc"
-	"io/ioutil"
 	"regexp"
 	"strconv"
 )
@@ -38,7 +36,6 @@
 	association        associationType
 	routingField       string
 	grpcService        string
-	protoDescriptor    *pb.FileDescriptorSet
 	methodMap          map[string]byte
 	nbBindingMethodMap map[string]byte
 	cluster            *cluster
@@ -94,37 +91,23 @@
 		rtrn_err = true
 	}
 
-	// Load the protobuf descriptor file
-	dr.protoDescriptor = &pb.FileDescriptorSet{}
-	fb, err := ioutil.ReadFile(config.ProtoFile)
-	if err != nil {
-		log.Errorf("Could not open proto file '%s'", config.ProtoFile)
-		rtrn_err = true
-	}
-	err = proto.Unmarshal(fb, dr.protoDescriptor)
-	if err != nil {
-		log.Errorf("Could not unmarshal %s, %v", "proto.pb", err)
-		rtrn_err = true
-	}
-
 	// Build the routing structure based on the loaded protobuf
 	// descriptor file and the config information.
 	type key struct {
 		method string
 		field  string
 	}
-	var msgs = make(map[key]byte)
-	for _, f := range dr.protoDescriptor.File {
+	var fieldNumberLookup = make(map[key]byte)
+	for _, f := range rconf.protoDescriptor.File {
 		// Build a temporary map of message types by name.
 		for _, m := range f.MessageType {
 			for _, fld := range m.Field {
 				log.Debugf("Processing message '%s', field '%s'", *m.Name, *fld.Name)
-				msgs[key{*m.Name, *fld.Name}] = byte(*fld.Number)
+				fieldNumberLookup[key{*m.Name, *fld.Name}] = byte(*fld.Number)
 			}
 		}
 	}
-	log.Debugf("The map contains: %v", msgs)
-	for _, f := range dr.protoDescriptor.File {
+	for _, f := range rconf.protoDescriptor.File {
 		if *f.Package == rconf.ProtoPackage {
 			for _, s := range f.Service {
 				if *s.Name == rconf.ProtoService {
@@ -147,7 +130,7 @@
 							// The input type has the package name prepended to it. Remove it.
 							//in := (*m.InputType)[len(rconf.ProtoPackage)+2:]
 							in := pkg_methd[PKG_MTHD_MTHD]
-							dr.methodMap[*m.Name], ok = msgs[key{in, config.RouteField}]
+							dr.methodMap[*m.Name], ok = fieldNumberLookup[key{in, config.RouteField}]
 							if !ok {
 								log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
 									*m.Name, config.RouteField, in)
@@ -155,11 +138,11 @@
 							}
 						}
 						// The sb method is always included in the methods so we can check it here too.
-						if needSbMethod(*m.Name, config) {
+						if needNbBindingMethod(*m.Name, config) {
 							log.Debugf("Enabling southbound method '%s'", *m.Name)
 							// The output type has the package name prepended to it. Remove it.
 							out := (*m.OutputType)[len(rconf.ProtoPackage)+2:]
-							dr.nbBindingMethodMap[*m.Name], ok = msgs[key{out, config.RouteField}]
+							dr.nbBindingMethodMap[*m.Name], ok = fieldNumberLookup[key{out, config.RouteField}]
 							if !ok {
 								log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
 									*m.Name, config.RouteField, out)
@@ -188,7 +171,7 @@
 	return dr, nil
 }
 
-func needSbMethod(mthd string, conf *RouteConfig) bool {
+func needNbBindingMethod(mthd string, conf *RouteConfig) bool {
 	for _, m := range conf.NbBindingMethods {
 		if mthd == m {
 			return true
@@ -284,8 +267,8 @@
 
 func (ar AffinityRouter) Route(sel interface{}) *backend {
 	switch sl := sel.(type) {
-	case *nbFrame:
-		log.Debugf("Route called for nbFrame with method %s", sl.methodInfo.method)
+	case *requestFrame:
+		log.Debugf("Route called for requestFrame with method %s", sl.methodInfo.method)
 		// Check if this method should be affinity bound from the
 		// reply rather than the request.
 		if _, ok := ar.nbBindingMethodMap[sl.methodInfo.method]; ok {
@@ -331,6 +314,10 @@
 	return "", "", nil
 }
 
+func (ar AffinityRouter) IsStreaming(_ string) (bool, bool) {
+	panic("not implemented")
+}
+
 func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
 	return ar.cluster, nil
 }
@@ -344,10 +331,8 @@
 
 func (ar AffinityRouter) ReplyHandler(sel interface{}) error {
 	switch sl := sel.(type) {
-	case *sbFrame:
-		sl.mutex.Lock()
-		defer sl.mutex.Unlock()
-		log.Debugf("Reply handler called for sbFrame with method %s", sl.method)
+	case *responseFrame:
+		log.Debugf("Reply handler called for responseFrame with method %s", sl.method)
 		// Determine if reply action is required.
 		if fld, ok := ar.nbBindingMethodMap[sl.method]; ok && len(sl.payload) > 0 {
 			// Extract the field value from the frame and