[VOL-785,VOL-786,VOL-1315,VOL-1316]
Initial commit of the affinity router's data plane

Change-Id: Iccc93b5526d5d2468b33eff7d8847e22fb88ef2d
diff --git a/afrouter/afrouter/server.go b/afrouter/afrouter/server.go
new file mode 100644
index 0000000..3b729f9
--- /dev/null
+++ b/afrouter/afrouter/server.go
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+import (
+	"fmt"
+	"net"
+	"regexp"
+	"errors"
+	"strconv"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"github.com/opencord/voltha-go/common/log"
+)
+
+var (
+	clientStreamDescForProxying = &grpc.StreamDesc{
+		ServerStreams: true,
+		ClientStreams: true,
+	}
+)
+const (
+	REQ_ALL = 0
+	REQ_PACKAGE = 1
+	REQ_SERVICE = 2
+	REQ_METHOD = 3
+)
+
+type server struct {
+	running bool
+	name string
+	stype nbi
+	proxyListener net.Listener
+	routers map[string]map[string]Router
+	proxyServer *grpc.Server
+}
+
+type nbRequest struct {
+	srv interface{}
+	serverStream grpc.ServerStream
+	r Router
+	mthdSlice []string
+	metaKey string  // There should be at most one key specified. More than one is an error.
+	metaVal string  // This is the value extracted from the meta key if it exists or "" otherwise
+}
+
+var mthdSlicerExp string = `^/([a-zA-Z][a-zA-Z0-9]+)\.([a-zA-Z][a-zA-Z0-9]+)/([a-zA-Z][a-zA-Z0-9]+)`
+var mthdSlicer *regexp.Regexp // The compiled regex to extract the package/service/method
+
+
+func NewServer(config *ServerConfig) (*server,error) {
+	var err error = nil
+	var rtrn_err bool = false
+	var srvr *server
+	// Change over to the new configuration format
+	// Validate the configuration
+	// There should be a name
+	if config.Name == "" {
+	    log.Error("A server has been defined with no name")
+		rtrn_err = true
+	}
+	// Validate that there's a port specified
+	if config.Port == 0 {
+		log.Errorf("Server %s does not have a valid port assigned", config.Name)
+		rtrn_err = true
+	}
+	// Validate the ip address if one is provided
+	if ip := net.ParseIP(config.Addr); config.Addr != "" && ip == nil {
+		log.Errorf("Invalid address '%s' provided for server '%s'", config.Addr, config.Name)
+		rtrn_err = true
+	}
+	if config.Type != "grpc" && config.Type != "streaming_grpc" {
+		if config.Type == "" {
+		    log.Errorf("A server 'type' must be defined for server %s",config.Name)
+		} else {
+	        log.Errorf("The server type must be either 'grpc' or 'streaming_grpc' "+
+			    "but '%s' was found for server '%s'", config.Type, config.Name)
+		}
+		rtrn_err = true
+	}
+	if len(config.Routers) == 0 {
+	    log.Errorf("At least one router must be specified for server '%s'", config.Name)
+		rtrn_err = true
+	}
+
+	if rtrn_err == true {
+		return nil, errors.New("Server configuration failed")
+	} else {
+		// The configuration is valid, create a server and configure it.
+		srvr = &server{name:config.Name,routers:make(map[string]map[string]Router)}
+		// The listener
+		if srvr.proxyListener, err =
+			net.Listen("tcp", config.Addr + ":"+
+			strconv.Itoa(int(config.Port))); err != nil {
+			log.Error(err)
+			return nil, err
+		}
+		// Create the routers
+		log.Debugf("Configuring the routers for server %s", srvr.name)
+		for p,r := range(config.routers) {
+			log.Debugf("Processing router %s for package %s", r.Name,p)
+		    if dr,err := NewRouter(r); err != nil {
+			    log.Error(err)
+			    return nil, err
+		    } else {
+				log.Debugf("Adding router %s to the server %s for package %s and service %s",
+							dr.Name(), srvr.name, p, dr.Service())
+				if _,ok := srvr.routers[p]; ok {
+					srvr.routers[p][dr.Service()] = dr
+				} else {
+					srvr.routers[p] = make(map[string]Router)
+					srvr.routers[p][dr.Service()] = dr
+				}
+			}
+		}
+		// Configure the grpc handler
+		srvr.proxyServer = grpc.NewServer(
+			grpc.CustomCodec(Codec()),
+			grpc.UnknownServiceHandler(srvr.TransparentHandler()),
+		)
+
+	}
+	// Compile the regular expression to extract the method
+	mthdSlicer = regexp.MustCompile(mthdSlicerExp)
+	return srvr, nil
+}
+
+func (s *server) Name() (string) {
+	return s.name
+}
+
+func (s *server) TransparentHandler() grpc.StreamHandler {
+	return s.handler
+}
+
+
+func (s *server) getRouter(pkg *string, service *string) (Router,bool) {
+	if fn, ok := s.routers[*pkg][*service]; ok { // Both specified
+		return fn, ok
+	} else if fn, ok = s.routers["*"][*service]; ok { // Package wild card
+		return fn, ok
+	} else if fn, ok = s.routers[*pkg]["*"]; ok { // Service wild card
+		return fn, ok
+	} else if fn, ok = s.routers["*"]["*"]; ok { // Both Wildcarded
+		return fn, ok
+	} else {
+		return nil,false
+	}
+}
+
+
+func (s *server) handler(srv interface{}, serverStream grpc.ServerStream) error {
+	// Determine what router is intended to handle this request
+	fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
+	if !ok {
+		return grpc.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
+	}
+	log.Debugf("Processing grpc request %s on server %s",fullMethodName,s.name)
+	// The full method name is structured as follows:
+	// <package name>.<service>/<method>
+	mthdSlice := mthdSlicer.FindStringSubmatch(fullMethodName)
+	if mthdSlice == nil {
+		log.Errorf("Faled to slice full method %s, result: %v", fullMethodName, mthdSlice)
+	} else {
+		log.Debugf("Sliced full method %s: %v", fullMethodName, mthdSlice)
+	}
+	r, ok := s.getRouter(&mthdSlice[REQ_PACKAGE],&mthdSlice[REQ_SERVICE])
+	//fn, ok := s.routers[mthdSlice[REQ_PACKAGE]][mthdSlice[REQ_SERVICE]]
+	if !ok {
+		// TODO: Should this be punted to a default transparent router??
+		// Probably not, if one is defined yes otherwise just crap out.
+
+		err := errors.New(
+			fmt.Sprintf("Unable to dispatch! Service '%s' for package '%s' not found.",
+				mthdSlice[REQ_SERVICE], mthdSlice[REQ_PACKAGE]))
+		log.Error(err)
+		return err
+	}
+	log.Debugf("Selected router %s\n", r.Name())
+
+	mk,mv,err := r.GetMetaKeyVal(serverStream)
+	if err != nil {
+		log.Error(err)
+		return err
+	}
+
+	//nbR := &nbRequest(srv:srv,serverStream:serverStream,r:r,mthdSlice:mthdSlice,metaKey:mk,metaVal:mv)
+
+	// Extract the cluster from the selected router and use it to manage the transfer
+	if bkndClstr,err := r.BackendCluster(mthdSlice[REQ_METHOD], mk); err != nil {
+		return err
+	} else {
+		//return bkndClstr.handler(nbR)
+		return bkndClstr.handler(srv, serverStream, r, mthdSlice, mk, mv)
+	}
+
+	return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
+}
+