[VOL-785,VOL-786,VOL-1315,VOL-1316]
Initial commit of the affinity router's data plane
Change-Id: Iccc93b5526d5d2468b33eff7d8847e22fb88ef2d
diff --git a/afrouter/afrouter/affinity-router.go b/afrouter/afrouter/affinity-router.go
new file mode 100644
index 0000000..eeb4df3
--- /dev/null
+++ b/afrouter/afrouter/affinity-router.go
@@ -0,0 +1,389 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+import (
+ "fmt"
+ "errors"
+ "strconv"
+ "io/ioutil"
+ "google.golang.org/grpc"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ pb "github.com/golang/protobuf/protoc-gen-go/descriptor"
+)
+
+type AffinityRouter struct {
+ name string
+ routerType int // TODO: This is probably not needed
+ association int
+ routingField string
+ grpcService string
+ protoDescriptor *pb.FileDescriptorSet
+ methodMap map[string]byte
+ nbBindingMthdMap map[string]byte
+ bkndClstr *backendCluster
+ affinity map[string]*backend
+ curBknd **backend
+}
+
+func NewAffinityRouter(rconf *RouterConfig, config *RouteConfig) (Router,error) {
+ var err error = nil
+ var rtrn_err bool = false
+ // Validate the configuration
+
+ // A name must exist
+ if config.Name == "" {
+ log.Error("A router 'name' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoPackage == "" {
+ log.Error("A 'package' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoService == "" {
+ log.Error("A 'service' must be specified")
+ rtrn_err = true
+ }
+
+ //if config.RouteField == "" {
+ // log.Error("A 'routing_field' must be specified")
+ // rtrn_err = true
+ //}
+
+ // TODO The overrieds section is currently not being used
+ // so the router will route all methods based on the
+ // routing_field. This needs to be added so that methods
+ // can have different routing fields.
+ var bptr *backend
+ bptr = nil
+ dr := AffinityRouter{
+ name:config.Name,
+ grpcService:rconf.ProtoService,
+ affinity:make(map[string]*backend),
+ methodMap:make(map[string]byte),
+ nbBindingMthdMap:make(map[string]byte),
+ curBknd:&bptr,
+ //serialNo:0,
+ }
+ // An association must exist
+ dr.association = strIndex(rAssnNames, config.Association)
+ if dr.association == 0 {
+ if config.Association == "" {
+ log.Error("An association must be specified")
+ } else {
+ log.Errorf("The association '%s' is not valid", config.Association)
+ }
+ rtrn_err = true
+ }
+
+
+ // This has already been validated bfore this function
+ // is called so just use it.
+ for idx := range(rTypeNames) {
+ if config.Type == rTypeNames[idx] {
+ dr.routerType = idx
+ break
+ }
+ }
+
+ // Load the protobuf descriptor file
+ dr.protoDescriptor = &pb.FileDescriptorSet{}
+ fb, err := ioutil.ReadFile(config.ProtoFile);
+ if err != nil {
+ log.Errorf("Could not open proto file '%s'",config.ProtoFile)
+ rtrn_err = true
+ }
+ err = proto.Unmarshal(fb, dr.protoDescriptor)
+ if err != nil {
+ log.Errorf("Could not unmarshal %s, %v", "proto.pb", err)
+ rtrn_err = true
+ }
+
+
+ // Build the routing structure based on the loaded protobuf
+ // descriptor file and the config information.
+ type key struct {
+ mthd string
+ field string
+ }
+ var msgs map[key]byte = make(map[key]byte)
+ for _,f := range(dr.protoDescriptor.File) {
+ // Build a temporary map of message types by name.
+ for _,m := range(f.MessageType) {
+ for _,fld := range(m.Field) {
+ log.Debugf("Processing message '%s', field '%s'", *m.Name, *fld.Name)
+ msgs[key{*m.Name, *fld.Name}] = byte(*fld.Number)
+ }
+ }
+ }
+ log.Debugf("The map contains: %v", msgs)
+ for _,f := range(dr.protoDescriptor.File) {
+ if *f.Package == rconf.ProtoPackage {
+ for _, s:= range(f.Service) {
+ if *s.Name == rconf.ProtoService {
+ log.Debugf("Loading package data '%s' for service '%s' for router '%s'", *f.Package, *s.Name, dr.name)
+ // Now create a map keyed by method name with the value being the
+ // field number of the route selector.
+ var ok bool
+ for _,m := range(s.Method) {
+ // Find the input type in the messages and extract the
+ // field number and save it for future reference.
+ log.Debugf("Processing method '%s'",*m.Name)
+ // Determine if this is a method we're supposed to be processing.
+ if needMethod(*m.Name, config) == true {
+ log.Debugf("Enabling method '%s'",*m.Name)
+ // The input type has the package name prepended to it. Remove it.
+ in := (*m.InputType)[len(rconf.ProtoPackage)+2:]
+ dr.methodMap[*m.Name], ok = msgs[key{in, config.RouteField}]
+ if ok == false {
+ log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
+ *m.Name, config.RouteField, in)
+ rtrn_err = true
+ }
+ }
+ // The sb method is always included in the methods so we can check it here too.
+ if needSbMethod(*m.Name, config) == true {
+ log.Debugf("Enabling southbound method '%s'",*m.Name)
+ // The output type has the package name prepended to it. Remove it.
+ out := (*m.OutputType)[len(rconf.ProtoPackage)+2:]
+ dr.nbBindingMthdMap[*m.Name], ok = msgs[key{out, config.RouteField}]
+ if ok == false {
+ log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
+ *m.Name, config.RouteField, out)
+ rtrn_err = true
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+
+ // Create the backend cluster or link to an existing one
+ ok := true
+ if dr.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
+ if dr.bkndClstr, err = NewBackendCluster(config.backendCluster); err != nil {
+ log.Errorf("Could not create a backend for router %s", config.Name)
+ rtrn_err = true
+ }
+ }
+
+ if rtrn_err {
+ return dr,errors.New(fmt.Sprintf("Failed to create a new router '%s'",dr.name))
+ }
+
+ return dr,nil
+}
+
+func needSbMethod(mthd string, conf *RouteConfig) bool {
+ for _,m := range conf.NbBindingMethods {
+ if mthd == m {
+ return true
+ }
+ }
+ return false
+}
+
+func needMethod(mthd string, conf *RouteConfig) bool {
+ for _,m := range conf.Methods {
+ if mthd == m {
+ return true
+ }
+ }
+ return false
+}
+
+func (r AffinityRouter) Service() (string) {
+ return r.grpcService
+}
+
+func (r AffinityRouter) Name() (string) {
+ return r.name
+}
+
+func (r AffinityRouter) skipField(data *[]byte, idx *int) (error) {
+ switch (*data)[*idx]&3 {
+ case 0: // Varint
+ (*idx)++
+ for (*data)[*idx] >= 128 { (*idx)++}
+ case 1: // 64 bit
+ (*idx)+= 9
+ case 2: // Length delimited
+ (*idx)++
+ b := proto.NewBuffer((*data)[*idx:])
+ t , _ := b.DecodeVarint()
+ (*idx) += int(t)+1
+ case 3: // Deprecated
+ case 4: // Deprecated
+ case 5: // 32 bit
+ (*idx)+= 5
+ }
+ return nil
+}
+
+func (r AffinityRouter) decodeProtoField(payload []byte, fieldId byte) (string, error) {
+ idx :=0
+ b := proto.NewBuffer([]byte{})
+ b.DebugPrint("The Buffer", payload)
+ for { // Find the route selector field
+ log.Debugf("Decoding afinity value attributeNumber: %d from %v at index %d", fieldId, payload, idx)
+ log.Debugf("Attempting match with payload: %d, methodTable: %d", payload[idx], fieldId)
+ if payload[idx]>>3 == fieldId {
+ log.Debugf("Method match with payload: %d, methodTable: %d", payload[idx], fieldId)
+ // TODO: Consider supporting other selector types.... Way, way in the future
+ // ok, the future is now, support strings as well... ugh.
+ var selector string
+ switch payload[idx]&3 {
+ case 0: // Integer
+ b.SetBuf(payload[idx+1:])
+ v,e := b.DecodeVarint()
+ if e == nil {
+ log.Debugf("Decoded the ing field: %v", v)
+ selector = strconv.Itoa(int(v))
+ } else {
+ log.Errorf("Failed to decode varint %v", e)
+ return "", e
+ }
+ case 2: // Length delimited AKA string
+ b.SetBuf(payload[idx+1:])
+ v,e := b.DecodeStringBytes()
+ if e == nil {
+ log.Debugf("Decoded the string field: %v", v)
+ selector = v
+ } else {
+ log.Errorf("Failed to decode string %v", e)
+ return "", e
+ }
+ default:
+ err := errors.New(fmt.Sprintf("Only integer and string route selectors are permitted"))
+ log.Error(err)
+ return "", err
+ }
+ return selector, nil
+ } else if err := r.skipField(&payload, &idx); err != nil {
+ log.Errorf("Parsing message failed %v", err)
+ return "", err
+ }
+ }
+}
+
+func (r AffinityRouter) Route(sel interface{}) *backend {
+ switch sl := sel.(type) {
+ case *nbFrame:
+ log.Debugf("Route called for nbFrame with method %s", sl.mthdSlice[REQ_METHOD]);
+ // Check if this method should be affinity bound from the
+ // reply rather than the request.
+ if _,ok := r.nbBindingMthdMap[sl.mthdSlice[REQ_METHOD]]; ok == true {
+ var err error
+ log.Debugf("Method '%s' affinity binds on reply", sl.mthdSlice[REQ_METHOD])
+ // Just round robin route the southbound request
+ if *r.curBknd, err = r.bkndClstr.nextBackend(*r.curBknd,BE_SEQ_RR); err == nil {
+ return *r.curBknd
+ } else {
+ sl.err = err
+ return nil
+ }
+ }
+ // Not a south affinity binding method, proceed with north affinity binding.
+ if selector,err := r.decodeProtoField(sl.payload, r.methodMap[sl.mthdSlice[REQ_METHOD]]); err == nil {
+ if rtrn,ok := r.affinity[selector]; ok {
+ return rtrn
+ } else {
+ // The selector isn't in the map, create a new affinity mapping
+ log.Debugf("MUST CREATE A NEW AFFINITY MAP ENTRY!!")
+ var err error
+ if *r.curBknd, err = r.bkndClstr.nextBackend(*r.curBknd,BE_SEQ_RR); err == nil {
+ r.setAffinity(selector, *r.curBknd)
+ //r.affinity[selector] = *r.curBknd
+ //log.Debugf("New affinity set to backend %s",(*r.curBknd).name)
+ return *r.curBknd
+ } else {
+ sl.err = err
+ return nil
+ }
+ }
+ }
+ default:
+ log.Errorf("Internal: invalid data type in Route call %v", sel);
+ return nil
+ }
+ log.Errorf("Bad lookup in affinity map %v",r.affinity);
+ return nil
+}
+
+func (ar AffinityRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error) {
+ return "","",nil
+}
+
+func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*backendCluster,error) {
+ return ar.bkndClstr, nil
+}
+
+func (ar AffinityRouter) FindBackendCluster(beName string) *backendCluster {
+ if beName == ar.bkndClstr.name {
+ return ar.bkndClstr
+ }
+ return nil
+}
+
+func (r AffinityRouter) ReplyHandler(sel interface{}) error {
+ switch sl := sel.(type) {
+ case *sbFrame:
+ sl.lck.Lock()
+ defer sl.lck.Unlock()
+ log.Debugf("Reply handler called for sbFrame with method %s", sl.method);
+ // Determine if reply action is required.
+ if fld, ok := r.nbBindingMthdMap[sl.method]; ok == true && len(sl.payload) > 0 {
+ // Extract the field value from the frame and
+ // and set affinity accordingly
+ if selector,err := r.decodeProtoField(sl.payload, fld); err == nil {
+ log.Debug("Settign affinity on reply")
+ if r.setAffinity(selector, sl.be) != nil {
+ log.Error("Setting affinity on reply failed")
+ }
+ return nil
+ } else {
+ err := errors.New(fmt.Sprintf("Failed to decode reply field %d for method %s", fld, sl.method))
+ log.Error(err)
+ return err
+ }
+ }
+ return nil
+ default:
+ err := errors.New(fmt.Sprintf("Internal: invalid data type in ReplyHander call %v", sl))
+ log.Error(err)
+ return err
+ }
+}
+
+func (ar AffinityRouter) setAffinity(key string, be *backend) error {
+ if be2,ok := ar.affinity[key]; ok == false {
+ ar.affinity[key] = be
+ log.Debugf("New affinity set to backend %s for key %s",be.name, key)
+ } else if be2 != be {
+ err := errors.New(fmt.Sprintf("Attempting multiple sets of affinity for key %s to backend %s from %s on router %s",
+ key, be.name, ar.affinity[key].name, ar.name))
+ log.Error(err)
+ return err
+ }
+ return nil
+}
diff --git a/afrouter/afrouter/api.go b/afrouter/afrouter/api.go
new file mode 100644
index 0000000..dd1506f
--- /dev/null
+++ b/afrouter/afrouter/api.go
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+import (
+ "net"
+ "fmt"
+ "strconv"
+ "errors"
+ "google.golang.org/grpc"
+ "golang.org/x/net/context"
+ "github.com/opencord/voltha-go/common/log"
+ pb "github.com/opencord/voltha-go/protos/afrouter"
+)
+
+
+type ArouterApi struct {
+ addr string
+ port int
+ apiListener net.Listener
+ apiServer * grpc.Server
+ running bool
+ ar *ArouterProxy
+}
+
+func NewApi(config *ApiConfig, ar *ArouterProxy) (*ArouterApi, error) {
+ var rtrn_err bool
+ // Create a seperate server and listener for the API
+ // Validate the ip address if one is provided
+ if ip := net.ParseIP(config.Addr); config.Addr != "" && ip == nil {
+ log.Errorf("Invalid address '%s' provided for API server", config.Addr)
+ rtrn_err = true
+ }
+ if rtrn_err == true {
+ return nil, errors.New("Errors in API configuration")
+ } else {
+ var err error = nil
+ aa := &ArouterApi{addr:config.Addr,port:int(config.Port),ar:ar}
+ // Create the listener for the API server
+ if aa.apiListener, err =
+ net.Listen("tcp", config.Addr + ":"+
+ strconv.Itoa(int(config.Port))); err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ // Create the API server
+ aa.apiServer = grpc.NewServer()
+ pb.RegisterConfigurationServer(aa.apiServer, *aa)
+ return aa,err
+ }
+}
+
+func (aa *ArouterApi) getServer(srvr string) (*server, error) {
+ if s,ok := aa.ar.servers[srvr]; ok == false {
+ err := errors.New(fmt.Sprintf("Server '%s' doesn't exist", srvr))
+ return nil, err
+ } else {
+ return s, nil
+ }
+}
+
+func (aa *ArouterApi) getRouter(s *server, clstr string) (Router, error) {
+ for _,pkg := range s.routers {
+ for _,r := range pkg {
+ if c := r.FindBackendCluster(clstr); c != nil {
+ return r, nil
+ }
+ }
+ }
+ err := errors.New(fmt.Sprintf("Cluster '%s' doesn't exist", clstr))
+ return nil, err
+}
+
+func (aa *ArouterApi) getCluster(s *server, clstr string) (*backendCluster, error) {
+ for _,pkg := range s.routers {
+ for _,r := range pkg {
+ if c := r.FindBackendCluster(clstr); c != nil {
+ return c, nil
+ }
+ }
+ }
+ err := errors.New(fmt.Sprintf("Cluster '%s' doesn't exist", clstr))
+ return nil, err
+}
+
+func (aa *ArouterApi) getBackend(c *backendCluster, bknd string) (*backend, error) {
+ for _,b := range c.backends {
+ if b.name == bknd {
+ return b,nil
+ }
+ }
+ err := errors.New(fmt.Sprintf("Backend '%s' doesn't exist", bknd))
+ return nil, err
+}
+
+func (aa *ArouterApi) getConnection(b *backend, con string) (*beConnection, error) {
+ if c,ok := b.connections[con]; ok == false {
+ err := errors.New(fmt.Sprintf("Connection '%s' doesn't exist", con))
+ return nil, err
+ } else {
+ return c, nil
+ }
+}
+
+func (aa * ArouterApi) updateConnection(in *pb.Conn, cn *beConnection, b *backend) error {
+ sPort := strconv.FormatUint(in.Port,10)
+ // Check that the ip address and or port are different
+ if in.Addr == cn.addr && sPort == cn.port {
+ err := errors.New(fmt.Sprintf("Refusing to change connection '%s' to identical values", in.Connection))
+ return err
+ }
+ //log.Debugf("BEFORE: Be1: %v Be2 %v", cn.bknd, b)
+ cn.close()
+ cn.addr = in.Addr
+ cn.port = sPort
+ cn.connect()
+ return nil
+}
+
+func (aa ArouterApi) SetAffinity(ctx context.Context, in *pb.Affinity) (*pb.Result, error) {
+ log.Debugf("SetAffinity called! %v",in);
+ //return &pb.Result{Success:true,Error:""},nil
+ // Navigate down tot he connection and compare IP addresses and ports if they're
+ // not the same then close the existing connection. If they are bothe the same
+ // then return an error describing the situation.
+ var err error
+
+ aap := &aa
+
+ _=aap
+
+ log.Debugf("Getting router %s and route %s", in.Router, in.Route)
+ if r,ok := allRouters[in.Router+in.Route]; ok == true {
+ switch rr := r.(type) {
+ case AffinityRouter:
+ log.Debug("Affinity router found")
+ b := rr.FindBackendCluster(in.Cluster).getBackend(in.Backend)
+ if b != nil {
+ rr.setAffinity(in.Id, b)
+ } else {
+ log.Errorf("Requested backend '%s' not found", in.Backend)
+ }
+ _ = rr
+ case MethodRouter:
+ log.Debug("Method router found")
+ _ = rr
+ default:
+ log.Debug("Some other router found")
+ _ = rr
+ }
+ } else {
+ log.Debugf("Couldn't get router type")
+ return &pb.Result{Success:false,Error:err.Error()}, err
+ }
+
+ return &pb.Result{Success:true,Error:""},nil
+}
+
+func (aa ArouterApi) SetConnection(ctx context.Context, in *pb.Conn) (*pb.Result, error) {
+ log.Debugf("SetConnection called! %v",in);
+ // Navigate down tot he connection and compare IP addresses and ports if they're
+ // not the same then close the existing connection. If they are bothe the same
+ // then return an error describing the situation.
+ var s *server
+ var c *backendCluster
+ var b * backend
+ var cn * beConnection
+ var err error
+
+ aap := &aa
+ if s,err = (aap).getServer(in.Server); err != nil {
+ err := errors.New(fmt.Sprintf("Server '%s' doesn't exist", in.Server))
+ return &pb.Result{Success:false,Error:err.Error()}, err
+ }
+ // The cluster is usually accessed via tha router but since each
+ // cluster is unique it's good enough to find the router that
+ // has the cluster we're looking for rather than fully keying
+ // the path
+ if c,err = aap.getCluster(s, in.Cluster); err != nil {
+ return &pb.Result{Success:false,Error:err.Error()}, err
+ }
+
+ if b,err = aap.getBackend(c, in.Backend); err != nil {
+ return &pb.Result{Success:false,Error:err.Error()}, err
+ }
+
+ if cn,err = aap.getConnection(b, in.Connection); err != nil {
+ return &pb.Result{Success:false,Error:err.Error()}, err
+ }
+
+ if err = aap.updateConnection(in, cn, b); err != nil {
+ return &pb.Result{Success:false,Error:err.Error()}, err
+ }
+
+ return &pb.Result{Success:true,Error:""},nil
+}
+
+func (aa *ArouterApi) serve() {
+ // Start a serving thread
+ go func() {
+ aa.running = true
+ if err := aa.apiServer.Serve(aa.apiListener); err != nil {
+ aa.running = false
+ log.Error(err)
+ errChan <- err
+ }
+ }()
+}
+
diff --git a/afrouter/afrouter/arproxy.go b/afrouter/afrouter/arproxy.go
new file mode 100644
index 0000000..af5fa0f
--- /dev/null
+++ b/afrouter/afrouter/arproxy.go
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+// This file implements the ArouterPoxy struct and its
+// functions. The ArouterProxy is the top level object
+// for the affinity router.
+
+import (
+ "github.com/opencord/voltha-go/common/log"
+)
+
+
+type nbi int
+
+const (
+ GRPC_NBI nbi = 1
+ GRPC_STREAMING_NBI nbi = 2
+ GRPC_CONTROL_NBI nbi = 3
+)
+
+// String names for display in error messages.
+var arpxyNames = [...]string{"grpc_nbi", "grpc_streaming_nbi", "grpc_control_nbi"}
+var arProxy *ArouterProxy= nil
+
+type ArouterProxy struct {
+ servers map[string]*server // Defined in handler.go
+ api *ArouterApi
+}
+
+
+// Create the routing proxy
+func NewArouterProxy(conf *Configuration) (*ArouterProxy, error) {
+ arProxy = &ArouterProxy{servers:make(map[string]*server)}
+ // Create all the servers listed in the configuration
+ for _,s := range(conf.Servers) {
+ if ns, err := NewServer(&s); err != nil {
+ log.Error("Configuration failed")
+ return nil, err
+ } else {
+ arProxy.servers[ns.Name()] = ns
+ }
+ }
+
+ // TODO: The API is not mandatory, check if it's even in the config before
+ // trying to create it. If it isn't then don't bother but log a warning.
+ if api,err := NewApi(&conf.Api, arProxy); err != nil {
+ return nil, err
+ } else {
+ arProxy.api = api
+ }
+
+ return arProxy, nil
+}
+
+// Start serving
+func (ap *ArouterProxy) ListenAndServe() error {
+
+ for _, srvr := range ap.servers {
+ ap.serve(srvr)
+ }
+ ap.api.serve()
+
+ // Just wait until we're done which only happens
+ // on a signal or an error.
+ err := <-doneChan
+
+ return err
+}
+
+func (ap *ArouterProxy) serve(srvr *server) {
+
+ // Start a serving thread
+ go func() {
+ srvr.running = true
+ if err := srvr.proxyServer.Serve(srvr.proxyListener); err != nil {
+ srvr.running = false
+ log.Error(err)
+ errChan <- err
+ }
+ }()
+}
diff --git a/afrouter/afrouter/backend.go b/afrouter/afrouter/backend.go
new file mode 100644
index 0000000..1c9129e
--- /dev/null
+++ b/afrouter/afrouter/backend.go
@@ -0,0 +1,880 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+// Backend manager handles redundant connections per backend
+
+import (
+ "io"
+ "fmt"
+ "net"
+ "sync"
+ "time"
+ "errors"
+ "strconv"
+ "strings"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/connectivity"
+ "github.com/opencord/voltha-go/common/log"
+)
+
+
+
+const (
+ BE_ACTIVE_ACTIVE = 1 // Backend type active/active
+ BE_SERVER = 2 // Backend type single server
+ BE_SEQ_RR = 0 // Backend sequence round robin
+ AS_NONE = 0 // Association strategy: none
+ AS_SERIAL_NO = 1 // Association strategy: serial number
+ AL_NONE = 0 // Association location: none
+ AL_HEADER = 1 // Association location: header
+ AL_PROTOBUF = 2 // Association location: protobuf
+)
+
+
+var beTypeNames = []string{"","active_active","server"}
+var asTypeNames = []string{"","serial_number"}
+var alTypeNames = []string{"","header","protobuf"}
+
+var bClusters map[string]*backendCluster = make(map[string]*backendCluster)
+
+type backendCluster struct {
+ name string
+ //backends map[string]*backend
+ backends []*backend
+ beRvMap map[*backend]int
+ serialNoSource chan uint64
+}
+
+type backend struct {
+ lck sync.Mutex
+ name string
+ beType int
+ activeAssoc assoc
+ connFailCallback func(string, *backend)bool
+ connections map[string]*beConnection
+ opnConns int
+}
+
+type assoc struct {
+ strategy int
+ location int
+ field string // Used only if location is protobuf
+}
+
+type beConnection struct {
+ lck sync.Mutex
+ cncl context.CancelFunc
+ name string
+ addr string
+ port string
+ gConn *gConnection
+ bknd *backend
+}
+
+// This structure should never be referred to
+// by any routine outside of *beConnection
+// routines.
+type gConnection struct {
+ lck sync.Mutex
+ state connectivity.State
+ conn *grpc.ClientConn
+ cncl context.CancelFunc
+}
+
+type beClStrm struct {
+ strm grpc.ClientStream
+ ctxt context.Context
+ cncl context.CancelFunc
+ c2sRtrn chan error
+ s2cRtrn error
+}
+
+type beClStrms struct {
+ lck sync.Mutex
+ actvStrm *beClStrm
+ strms map[string]*beClStrm
+}
+
+//***************************************************************//
+//****************** BackendCluster Functions *******************//
+//***************************************************************//
+
+//TODO: Move the backend type (active/active etc) to the cluster
+// level. All backends should really be of the same type.
+// Create a new backend cluster
+func NewBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) {
+ var err error = nil
+ var rtrn_err bool = false
+ var be *backend
+ log.Debug("Creating a backend cluster with %v", conf)
+ // Validate the configuration
+ if conf.Name == "" {
+ log.Error("A backend cluster must have a name")
+ rtrn_err = true
+ }
+ //bc := &backendCluster{name:conf.Name,backends:make(map[string]*backend)}
+ bc := &backendCluster{name:conf.Name, beRvMap:make(map[*backend]int)}
+ bClusters[bc.name] = bc
+ bc.startSerialNumberSource() // Serial numberere for active/active backends
+ idx := 0
+ for _, bec := range(conf.Backends) {
+ if bec.Name == "" {
+ log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
+ rtrn_err = true
+ }
+ if be,err = newBackend(&bec, conf.Name); err != nil {
+ log.Errorf("Error creating backend %s", bec.Name)
+ rtrn_err = true
+ }
+ bc.backends = append(bc.backends, be)
+ bc.beRvMap[bc.backends[idx]] = idx
+ idx++
+ }
+ if rtrn_err {
+ return nil, errors.New("Error creating backend(s)")
+ }
+ return bc, nil
+}
+
+func (bc * backendCluster) getBackend(name string) *backend {
+ for _,v := range bc.backends {
+ if v.name == name {
+ return v
+ }
+ }
+ return nil
+}
+
+func (bc *backendCluster) startSerialNumberSource() {
+ bc.serialNoSource = make(chan uint64)
+ var counter uint64 = 0
+ // This go routine only dies on exit, it is not a leak
+ go func() {
+ for {
+ bc.serialNoSource <- counter
+ counter++
+ }
+ }()
+}
+
+func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend,error) {
+ switch seq {
+ case BE_SEQ_RR: // Round robin
+ in := be
+ // If no backend is found having a connection
+ // then return nil.
+ if be == nil {
+ log.Debug("Previous backend is nil")
+ be = bc.backends[0]
+ in = be
+ if be.opnConns != 0 {
+ return be,nil
+ }
+ }
+ for {
+ log.Debugf("Requesting a new backend starting from %s", be.name)
+ cur := bc.beRvMap[be]
+ cur++
+ if cur >= len(bc.backends) {
+ cur = 0
+ }
+ log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name)
+ if bc.backends[cur].opnConns > 0 {
+ return bc.backends[cur], nil
+ }
+ if bc.backends[cur] == in {
+ err := fmt.Errorf("No backend with open connections found")
+ log.Debug(err);
+ return nil,err
+ }
+ be = bc.backends[cur]
+ log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name)
+ }
+ default: // Invalid, defalt to routnd robin
+ log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
+ return bc.nextBackend(be, BE_SEQ_RR)
+ }
+}
+
+func (bec *backendCluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, mthdSlice []string,
+ mk string, mv string) error {
+//func (bec *backendCluster) handler(nbR * nbRequest) error {
+
+ // The final backend cluster needs to be determined here. With non-affinity routed backends it could
+ // just be determined here and for affinity routed backends the first message must be received
+ // before the backend is determined. In order to keep things simple, the same approach is taken for
+ // now.
+
+ // Get the backend to use.
+ // Allocate the nbFrame here since it holds the "context" of this communication
+ nf := &nbFrame{router:r, mthdSlice:mthdSlice, serNo:bec.serialNoSource, metaKey:mk, metaVal:mv}
+ log.Debugf("Nb frame allocate with method %s", nf.mthdSlice[REQ_METHOD])
+
+ if be,err := bec.assignBackend(serverStream, nf); err != nil {
+ // At this point, no backend streams have been initiated
+ // so just return the error.
+ return err
+ } else {
+ log.Debugf("Backend '%s' selected", be.name)
+ // Allocate a sbFrame here because it might be needed for return value intercept
+ sf := &sbFrame{router:r, be:be, method:nf.mthdSlice[REQ_METHOD], metaKey:mk, metaVal:mv}
+ log.Debugf("Sb frame allocated with router %s",r.Name())
+ return be.handler(srv, serverStream, nf, sf)
+ }
+}
+
+func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f * nbFrame) (*beClStrms, error) {
+
+ rtrn := &beClStrms{strms:make(map[string]*beClStrm),actvStrm:nil}
+
+ log.Debugf("Opening southbound streams for method '%s'", f.mthdSlice[REQ_METHOD])
+ // Get the metadata from the incoming message on the server
+ md, ok := metadata.FromIncomingContext(serverStream.Context())
+ if !ok {
+ return nil, errors.New("Could not get a server stream metadata")
+ }
+
+ // TODO: Need to check if this is an active/active backend cluster
+ // with a serial number in the header.
+ serialNo := <-f.serNo
+ log.Debugf("Serial number for transaction allocated: %d", serialNo)
+ // If even one stream can be created then proceed. If none can be
+ // created then report an error becase both the primary and redundant
+ // connections are non-existant.
+ var atLeastOne bool = false
+ var errStr strings.Builder
+ log.Debugf("There are %d connections to open", len(be.connections))
+ for cnk,cn := range be.connections {
+ // TODO: THIS IS A HACK to suspend redundancy for binding routers for all calls
+ // and its very specific to a use case. There should really be a per method
+ // mechanism to select non-redundant calls for all router types. This needs
+ // to be fixed ASAP. The overrides should be used for this, the implementation
+ // is simple, and it can be done here.
+ if atLeastOne == true && f.metaKey != NoMeta {
+ // Don't open any more southbound streams
+ log.Debugf("Not opening any more SB streams, metaKey = %s", f.metaKey)
+ rtrn.strms[cnk] = nil
+ continue
+ }
+ // Copy in the metadata
+ if cn.getState() == connectivity.Ready && cn.getConn() != nil {
+ log.Debugf("Opening southbound stream for connection '%s'", cnk)
+ // Create an outgoing context that includes the incoming metadata
+ // and that will cancel if the server's context is canceled
+ clientCtx, clientCancel := context.WithCancel(serverStream.Context())
+ clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
+ //TODO: Same check here, only add the serial number if necessary
+ clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
+ strconv.FormatUint(serialNo,10))
+ // Create the client stream
+ if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
+ cn.getConn(), f.mthdSlice[REQ_ALL]); err !=nil {
+ log.Debug("Failed to create a client stream '%s', %v",cn.name,err)
+ fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
+ rtrn.strms[cnk] = nil
+ } else {
+ rtrn.strms[cnk] = &beClStrm{strm:clientStream, ctxt:clientCtx, cncl:clientCancel, s2cRtrn:nil,
+ c2sRtrn:make(chan error, 1)}
+ atLeastOne = true
+ }
+ } else if cn.getConn() == nil {
+ err := errors.New(fmt.Sprintf("Connection '%s' is closed", cn.name))
+ fmt.Fprint(&errStr, err.Error())
+ log.Debug(err)
+ } else {
+ err := errors.New(fmt.Sprintf("Connection '%s' isn't ready", cn.name))
+ fmt.Fprint(&errStr, err.Error())
+ log.Debug(err)
+ }
+ }
+ if atLeastOne == true {
+ return rtrn,nil
+ }
+ fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ",be.name)
+ log.Error(errStr.String())
+ return nil, errors.New(errStr.String())
+}
+
+func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf * nbFrame, sf * sbFrame) error {
+
+ // Set up and launch each individual southbound stream
+ var beStrms *beClStrms
+
+ beStrms, err := be.openSouthboundStreams(srv,serverStream,nf)
+ if err != nil {
+ log.Errorf("openStreams failed: %v",err)
+ return err
+ }
+ // If we get here, there has to be AT LEAST ONE open stream
+
+ // *Do not explicitly close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
+ // Channels do not have to be closed, it is just a control flow mechanism, see
+ // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
+
+ log.Debug("Starting server to client forwarding")
+ s2cErrChan := beStrms.forwardServerToClient(serverStream, nf)
+
+ log.Debug("Starting client to server forwarding")
+ c2sErrChan := beStrms.forwardClientToServer(serverStream, sf)
+
+ // We don't know which side is going to stop sending first, so we need a select between the two.
+ for i := 0; i < 2; i++ {
+ select {
+ case s2cErr := <-s2cErrChan:
+ log.Debug("Processing s2cErr")
+ if s2cErr == io.EOF {
+ log.Debug("s2cErr reporting EOF")
+ // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
+ // the clientStream>serverStream may continue sending though.
+ beStrms.closeSend()
+ break
+ } else {
+ log.Debugf("s2cErr reporting %v",s2cErr)
+ // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
+ // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
+ // exit with an error to the stack
+ beStrms.clientCancel()
+ return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
+ }
+ case c2sErr := <-c2sErrChan:
+ log.Debug("Processing c2sErr")
+ // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
+ // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
+ // will be nil.
+ serverStream.SetTrailer(beStrms.trailer())
+ // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
+ if c2sErr != io.EOF {
+ return c2sErr
+ }
+ return nil
+ }
+ }
+ return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
+}
+
+func (strms *beClStrms) clientCancel() {
+ for _,strm := range strms.strms {
+ if strm != nil {
+ strm.cncl()
+ }
+ }
+}
+
+func (strms *beClStrms) closeSend() {
+ for _,strm := range strms.strms {
+ if strm != nil {
+ log.Debug("Closing southbound stream")
+ strm.strm.CloseSend()
+ }
+ }
+}
+
+func (strms *beClStrms) trailer() metadata.MD {
+ return strms.actvStrm.strm.Trailer()
+}
+
+func (bec *backendCluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) {
+ // Receive the first message from the server. This calls the assigned codec in which
+ // Unmarshal gets executed. That will use the assigned router to select a backend
+ // and add it to the frame
+ if err := src.RecvMsg(f); err != nil {
+ return nil, err
+ }
+ // Check that the backend was routable and actually has connections open.
+ // If it doesn't then return a nil backend to indicate this
+ if f.be == nil {
+ err := fmt.Errorf("Unable to route method '%s'", f.mthdSlice[REQ_METHOD])
+ log.Error(err)
+ return nil, err
+ } else if f.be.opnConns == 0 {
+ err := fmt.Errorf("No open connections on backend '%s'", f.be.name)
+ log.Error(err)
+ return f.be, err
+ }
+ return f.be, nil
+}
+
+func (strms * beClStrms) getActive() *beClStrm {
+ strms.lck.Lock()
+ defer strms.lck.Unlock()
+ return strms.actvStrm
+}
+
+func (strms *beClStrms) setThenGetActive(strm *beClStrm) (*beClStrm) {
+ strms.lck.Lock()
+ defer strms.lck.Unlock()
+ if strms.actvStrm == nil {
+ strms.actvStrm = strm
+ }
+ return strms.actvStrm
+}
+
+func (src *beClStrms) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
+ fc2s := func(srcS *beClStrm) {
+ for i := 0; ; i++ {
+ if err := srcS.strm.RecvMsg(f); err != nil {
+ if src.setThenGetActive(srcS) == srcS {
+ srcS.c2sRtrn <- err // this can be io.EOF which is the success case
+ } else {
+ srcS.c2sRtrn <- nil // Inactive responder
+ }
+ break
+ }
+ if src.setThenGetActive(srcS) != srcS {
+ srcS.c2sRtrn <- nil
+ break
+ }
+ if i == 0 {
+ // This is a bit of a hack, but client to server headers are only readable after first client msg is
+ // received but must be written to server stream before the first msg is flushed.
+ // This is the only place to do it nicely.
+ md, err := srcS.strm.Header()
+ if err != nil {
+ srcS.c2sRtrn <- err
+ break
+ }
+ // Update the metadata for the response.
+ if f.metaKey != NoMeta {
+ if f.metaVal == "" {
+ // We could also alsways just do this
+ md.Set(f.metaKey, f.be.name)
+ } else {
+ md.Set(f.metaKey, f.metaVal)
+ }
+ }
+ if err := dst.SendHeader(md); err != nil {
+ srcS.c2sRtrn <- err
+ break
+ }
+ }
+ log.Debugf("Northbound frame %v", f.payload)
+ if err := dst.SendMsg(f); err != nil {
+ srcS.c2sRtrn <- err
+ break
+ }
+ }
+ }
+
+ // There should be AT LEAST one open stream at this point
+ // if there isn't its a grave error in the code and it will
+ // cause this thread to block here so check for it and
+ // don't let the lock up happen but report the error
+ ret := make(chan error, 1)
+ agg := make(chan *beClStrm)
+ atLeastOne := false
+ for _,strm := range src.strms {
+ if strm != nil {
+ go fc2s(strm)
+ go func(s *beClStrm) { // Wait on result and aggregate
+ r := <-s.c2sRtrn // got the return code
+ if r == nil {
+ return // We're the redundat stream, just die
+ }
+ s.c2sRtrn <- r // put it back to pass it along
+ agg <- s // send the stream to the aggregator
+ } (strm)
+ atLeastOne = true
+ }
+ }
+ if atLeastOne == true {
+ go func() { // Wait on aggregated result
+ s := <-agg
+ ret <- <-s.c2sRtrn
+ }()
+ } else {
+ err := errors.New("There are no open streams. Unable to forward message.")
+ log.Error(err)
+ ret <- err
+ }
+ return ret
+}
+
+func (strms *beClStrms) sendAll(f *nbFrame) error {
+ var rtrn error
+
+ atLeastOne := false
+ for _,strm := range(strms.strms) {
+ if strm != nil {
+ if err := strm.strm.SendMsg(f); err != nil {
+ strm.s2cRtrn = err
+ }
+ atLeastOne = true
+ }
+ }
+ // If one of the streams succeeded, declare success
+ // if none did pick an error and return it.
+ if atLeastOne == true {
+ for _,strm := range(strms.strms) {
+ if strm != nil {
+ rtrn = strm.s2cRtrn
+ if rtrn == nil {
+ return rtrn
+ }
+ }
+ }
+ return rtrn
+ } else {
+ rtrn = errors.New("There are no open streams, this should never happen")
+ log.Error(rtrn)
+ }
+ return rtrn;
+}
+
+func (dst *beClStrms) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
+ ret := make(chan error, 1)
+ go func() {
+ // The frame buffer already has the results of a first
+ // RecvMsg in it so the first thing to do is to
+ // send it to the list of client streams and only
+ // then read some more.
+ for i := 0; ; i++ {
+ // Send the message to each of the backend streams
+ if err := dst.sendAll(f); err != nil {
+ ret <- err
+ break
+ }
+ log.Debugf("Southbound frame %v", f.payload)
+ if err := src.RecvMsg(f); err != nil {
+ ret <- err // this can be io.EOF which is happy case
+ break
+ }
+ }
+ }()
+ return ret
+}
+
+func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
+ var rtrn_err bool = false
+
+ log.Debugf("Configuring the backend with %v", *conf)
+ // Validate the conifg and configure the backend
+ be:=&backend{name:conf.Name,connections:make(map[string]*beConnection),opnConns:0}
+ idx := strIndex([]string(beTypeNames),conf.Type)
+ if idx == 0 {
+ log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
+ rtrn_err = true
+ }
+ be.beType = idx
+
+ idx = strIndex(asTypeNames, conf.Association.Strategy)
+ if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
+ log.Errorf("An association strategy must be provided if the backend "+
+ "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
+ rtrn_err = true
+ }
+ be.activeAssoc.strategy = idx
+
+ idx = strIndex(alTypeNames, conf.Association.Location)
+ if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
+ log.Errorf("An association location must be provided if the backend "+
+ "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
+ rtrn_err = true
+ }
+ be.activeAssoc.location = idx
+
+ if conf.Association.Field == "" && be.activeAssoc.location == AL_PROTOBUF {
+ log.Errorf("An association field must be provided if the backend "+
+ "type is active/active and the location is set to protobuf "+
+ "for backend %s in cluster %s", conf.Name, clusterName)
+ rtrn_err = true
+ }
+ be.activeAssoc.field = conf.Association.Field
+ if rtrn_err {
+ return nil, errors.New("Backend configuration failed")
+ }
+ // Configure the connections
+ // Connections can consist of just a name. This allows for dynamic configuration
+ // at a later time.
+ // TODO: validate that there is one connection for all but active/active backends
+ for _,cnConf := range(conf.Connections) {
+ if cnConf.Name == "" {
+ log.Errorf("A connection must have a name for backend %s in cluster %s",
+ conf.Name, clusterName)
+ } else {
+ gc:=&gConnection{conn:nil,cncl:nil,state:connectivity.Idle}
+ be.connections[cnConf.Name] = &beConnection{name:cnConf.Name,addr:cnConf.Addr,port:cnConf.Port,bknd:be,gConn:gc}
+ if cnConf.Addr != "" { // This connection will be specified later.
+ if ip := net.ParseIP(cnConf.Addr); ip == nil {
+ log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid",
+ cnConf.Name, conf.Name, clusterName)
+ rtrn_err = true
+ }
+ // Validate the port number. This just validtes that it's a non 0 integer
+ if n,err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
+ log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
+ cnConf.Port, cnConf.Name, conf.Name, clusterName)
+ rtrn_err = true
+ } else {
+ if n <=0 && n > 65535 {
+ log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
+ cnConf.Port, cnConf.Name, conf.Name, clusterName)
+ rtrn_err = true
+ }
+ }
+ }
+ }
+ }
+ if rtrn_err {
+ return nil, errors.New("Connection configuration failed")
+ }
+ // All is well start the backend cluster connections
+ be.connectAll()
+
+ return be, nil
+}
+
+//***************************************************************//
+//********************* Backend Functions ***********************//
+//***************************************************************//
+
+func (be *backend) incConn() {
+ be.lck.Lock()
+ defer be.lck.Unlock()
+ be.opnConns++
+}
+
+func (be *backend) decConn() {
+ be.lck.Lock()
+ defer be.lck.Unlock()
+ be.opnConns--
+ if be.opnConns < 0 {
+ log.Error("Internal error, number of open connections less than 0")
+ be.opnConns = 0
+ }
+}
+
+// Attempts to establish all the connections for a backend
+// any failures result in an abort. This should only be called
+// on a first attempt to connect. Individual connections should be
+// handled after that.
+func (be *backend) connectAll() {
+ for _,cn := range be.connections {
+ cn.connect()
+ }
+}
+
+func (cn *beConnection) connect() {
+ if cn.addr != "" && cn.getConn() == nil {
+ log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name,cn.addr,cn.port)
+ // Dial doesn't block, it just returns and continues connecting in the background.
+ // Check back later to confirm and increase the connection count.
+ ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
+ cn.setCncl(cnclFnc)
+ if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
+ log.Errorf("Dialng connection %v:%v",cn,err)
+ cn.waitAndTryAgain(ctx)
+ } else {
+ cn.setConn(conn)
+ log.Debugf("Starting the connection monitor for '%s'", cn.name)
+ cn.monitor(ctx)
+ }
+ } else if cn.addr == "" {
+ log.Infof("No address supplied for connection '%s', not connecting for now", cn.name)
+ } else {
+ log.Debugf("Connection '%s' is already connected, ignoring", cn.name)
+ }
+}
+
+func (cn *beConnection) waitAndTryAgain(ctx context.Context) {
+ go func(ctx context.Context) {
+ ctxTm,cnclTm := context.WithTimeout(context.Background(), 10 * time.Second)
+ select {
+ case <-ctxTm.Done():
+ cnclTm()
+ log.Debugf("Trying to connect '%s'",cn.name)
+ // Connect creates a new context so cancel this one.
+ cn.cancel()
+ cn.connect()
+ return
+ case <-ctx.Done():
+ cnclTm()
+ return
+ }
+ }(ctx)
+}
+
+func (cn *beConnection) cancel() {
+ cn.lck.Lock()
+ defer cn.lck.Unlock()
+ log.Debugf("Canceling connection %s", cn.name)
+ if cn.gConn != nil{
+ if cn.gConn.cncl != nil {
+ cn.cncl()
+ } else {
+ log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
+ }
+ } else {
+ log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name)
+ }
+}
+
+func (cn *beConnection) setCncl(cncl context.CancelFunc) {
+ cn.lck.Lock()
+ defer cn.lck.Unlock()
+ if cn.gConn != nil {
+ cn.gConn.cncl = cncl
+ } else {
+ log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name)
+ }
+}
+
+func (cn *beConnection) setConn(conn *grpc.ClientConn) {
+ cn.lck.Lock()
+ defer cn.lck.Unlock()
+ if cn.gConn != nil {
+ cn.gConn.conn = conn
+ } else {
+ log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name)
+ }
+}
+
+func (cn *beConnection) getConn() *grpc.ClientConn {
+ cn.lck.Lock()
+ defer cn.lck.Unlock()
+ if cn.gConn != nil {
+ return cn.gConn.conn
+ }
+ return nil
+}
+
+func (cn *beConnection) close() {
+ cn.lck.Lock()
+ defer cn.lck.Unlock()
+ log.Debugf("Closing connection %s", cn.name)
+ if cn.gConn != nil && cn.gConn.conn != nil {
+ if cn.gConn.conn.GetState() == connectivity.Ready {
+ cn.bknd.decConn() // Decrease the connection reference
+ }
+ if cn.gConn.cncl != nil {
+ cn.gConn.cncl() // Cancel the context first to force monitor functions to exit
+ } else {
+ log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
+ }
+ cn.gConn.conn.Close() // Close the connection
+ // Now replace the gConn object with a new one as this one just
+ // fades away as references to it are released after the close
+ // finishes in the background.
+ cn.gConn = &gConnection{conn:nil,cncl:nil,state:connectivity.TransientFailure}
+ } else {
+ log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
+ }
+
+}
+
+func (cn *beConnection) setState(st connectivity.State) {
+ cn.lck.Lock()
+ defer cn.lck.Unlock()
+ if cn.gConn != nil {
+ cn.gConn.state = st
+ } else {
+ log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name)
+ }
+}
+
+func (cn *beConnection) getState() (connectivity.State) {
+ cn.lck.Lock()
+ defer cn.lck.Unlock()
+ if cn.gConn != nil {
+ if cn.gConn.conn != nil {
+ return cn.gConn.conn.GetState()
+ } else {
+ log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name)
+ }
+ } else {
+ log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name)
+ }
+ // For lack of a better state to use. The logs will help determine what happened here.
+ return connectivity.TransientFailure
+}
+
+
+func (cn *beConnection) monitor(ctx context.Context) {
+ bp := cn.bknd
+ log.Debugf("Setting up monitoring for backend %s", bp.name)
+ go func(ctx context.Context) {
+ var delay time.Duration = 100 //ms
+ for {
+ //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, bp.name, cn.conn)
+ if cn.getState() == connectivity.Ready {
+ log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, bp.name)
+ cn.setState(connectivity.Ready)
+ bp.incConn()
+ if cn.getConn() != nil && cn.getConn().WaitForStateChange(ctx, connectivity.Ready) == false {
+ // The context was canceled. This is done by the close function
+ // so just exit the routine
+ log.Debugf("Contxt canceled for connection '%s' on backend '%s'",cn.name, bp.name)
+ return
+ }
+ if cs := cn.getConn(); cs != nil {
+ switch cs := cn.getState(); cs {
+ case connectivity.TransientFailure:
+ cn.setState(cs)
+ bp.decConn()
+ log.Infof("Transient failure for connection '%s' on backend '%s'",cn.name, bp.name)
+ delay = 100
+ case connectivity.Shutdown:
+ //The connection was closed. The assumption here is that the closer
+ // will manage the connection count and setting the conn to nil.
+ // Exit the routine
+ log.Infof("Shutdown for connection '%s' on backend '%s'",cn.name, bp.name)
+ return
+ case connectivity.Idle:
+ // This can only happen if the server sends a GoAway. This can
+ // only happen if the server has modified MaxConnectionIdle from
+ // its default of infinity. The only solution here is to close the
+ // connection and keepTrying()?
+ //TODO: Read the grpc source code to see if there's a different approach
+ log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'",cn.name, bp.name)
+ cn.close()
+ cn.connect()
+ return
+ }
+ } else { // A nil means something went horribly wrong, error and exit.
+ log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s",cn.name)
+ return
+ }
+ } else {
+ log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, bp.name)
+ ctxTm, cnclTm := context.WithTimeout(context.Background(), delay * time.Millisecond)
+ if delay < 30000 {
+ delay += delay
+ }
+ select {
+ case <-ctxTm.Done():
+ cnclTm() // Doubt this is required but it's harmless.
+ // Do nothing but let the loop continue
+ case <-ctx.Done():
+ // Context was closed, close and exit routine
+ //cn.close() NO! let the close be managed externally!
+ return
+ }
+ }
+ }
+ }(ctx)
+}
+
+// Set a callback for connection failure notification
+// This is currently not used.
+func (bp * backend) setConnFailCallback(cb func(string, *backend)bool) {
+ bp.connFailCallback = cb
+}
+
diff --git a/afrouter/afrouter/binding-router.go b/afrouter/afrouter/binding-router.go
new file mode 100644
index 0000000..910e6a1
--- /dev/null
+++ b/afrouter/afrouter/binding-router.go
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+import (
+ "fmt"
+ "errors"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+ "github.com/opencord/voltha-go/common/log"
+)
+
+type BindingRouter struct {
+ name string
+ routerType int // TODO: This is probably not needed
+ association int
+ //routingField string
+ grpcService string
+ //protoDescriptor *pb.FileDescriptorSet
+ //methodMap map[string]byte
+ bkndClstr *backendCluster
+ bindings map[string]*backend
+ bindingType string
+ bindingField string
+ bindingMethod string
+ curBknd **backend
+}
+
+func (br BindingRouter) BackendCluster(s string, metaKey string) (*backendCluster,error) {
+ return br.bkndClstr, nil
+ //return nil,errors.New("Not implemented yet")
+}
+func (br BindingRouter) Name() (string) {
+ return br.name
+}
+func (br BindingRouter) Service() (string) {
+ return br.grpcService
+}
+func (br BindingRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error) {
+ var rtrnK string = ""
+ var rtrnV string = ""
+
+ // Get the metadata from the server stream
+ md, ok := metadata.FromIncomingContext(serverStream.Context())
+ if !ok {
+ return rtrnK, rtrnV, errors.New("Could not get a server stream metadata")
+ }
+
+ // Determine if one of the method routing keys exists in the metadata
+ if _,ok := md[br.bindingField]; ok == true {
+ rtrnV = md[br.bindingField][0]
+ rtrnK = br.bindingField
+ }
+
+ return rtrnK,rtrnV,nil
+}
+func (br BindingRouter) FindBackendCluster(string) (*backendCluster) {
+ return nil
+}
+func (br BindingRouter) ReplyHandler(v interface{}) error {
+ return nil
+}
+func (br BindingRouter) Route(sel interface{}) (*backend) {
+ var err error
+ switch sl := sel.(type) {
+ case *nbFrame:
+ if b, ok := br.bindings[sl.metaVal]; ok == true { // binding exists, just return it
+ return b
+ } else { // establish a new binding or error.
+ if sl.metaVal != "" {
+ err = errors.New(fmt.Sprintf("Attempt to route on non-existent metadata value '%s' in key '%s'",
+ sl.metaVal, sl.metaKey))
+ log.Error(err)
+ sl.err = err
+ return nil
+ }
+ if sl.mthdSlice[REQ_METHOD] != br.bindingMethod {
+ err = errors.New(fmt.Sprintf("Binding must occur with method %s but attempted with method %s",
+ br.bindingMethod, sl.mthdSlice[REQ_METHOD]))
+ log.Error(err)
+ sl.err = err
+ return nil
+ }
+ log.Debugf("MUST CREATE A NEW BINDING MAP ENTRY!!")
+ if len(br.bindings) < len(br.bkndClstr.backends) {
+ if *br.curBknd, err = br.bkndClstr.nextBackend(*br.curBknd,BE_SEQ_RR); err == nil {
+ // Use the name of the backend as the metaVal for this new binding
+ br.bindings[(*br.curBknd).name] = *br.curBknd
+ return *br.curBknd
+ } else {
+ log.Error(err)
+ sl.err = err
+ return nil
+ }
+ } else {
+ err = errors.New(fmt.Sprintf("Backends exhausted in attempt to bind for metakey '%s' with value '%s'",
+ sl.metaKey, sl.metaVal))
+ log.Error(err)
+ sl.err = err
+ }
+ }
+ return nil
+ default:
+ return nil
+ }
+ return nil
+}
+
+func NewBindingRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+ var rtrn_err bool = false
+ var err error = nil
+ log.Debugf("Creating binding router %s",config.Name)
+ // A name must exist
+ if config.Name == "" {
+ log.Error("A router 'name' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoPackage == "" {
+ log.Error("A 'package' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoService == "" {
+ log.Error("A 'service' must be specified")
+ rtrn_err = true
+ }
+
+ //if config.RouteField == "" {
+ // log.Error("A 'routing_field' must be specified")
+ // rtrn_err = true
+ //}
+
+ // TODO: Using the specified service, the imported proto
+ // descriptor file should be scanned for all methods provided
+ // for this router to ensure that this field exists in
+ // the message(s) passed to the method. This will avoid run
+ // time failures that might not be detected for long periods
+ // of time.
+
+ // TODO The routes section is currently not being used
+ // so the router will route all methods based on the
+ // routing_field. This needs to be done.
+ var bptr *backend
+ bptr = nil
+ br := BindingRouter{
+ name:config.Name,
+ grpcService:rconf.ProtoService,
+ bindings:make(map[string]*backend),
+ //methodMap:make(map[string]byte),
+ curBknd:&bptr,
+ //serialNo:0,
+ }
+
+ // A binding association must exist
+ br.association = strIndex(rAssnNames, config.Binding.Association)
+ if br.association == 0 {
+ if config.Binding.Association == "" {
+ log.Error("An binding association must be specified")
+ } else {
+ log.Errorf("The binding association '%s' is not valid", config.Binding.Association)
+ }
+ rtrn_err = true
+ }
+ // A binding type must exist
+ // TODO: This is parsed but ignored and a header based type is used.
+ if config.Binding.Type != "header" {
+ log.Error("The binding type must be set to header")
+ rtrn_err = true
+ } else {
+ br.bindingType = config.Binding.Type
+ }
+ // A binding method must exist
+ if config.Binding.Method == "" {
+ log.Error("The binding method must be specified")
+ rtrn_err = true
+ } else {
+ br.bindingMethod = config.Binding.Method
+ }
+ // A binding field must exxist
+ if config.Binding.Field == "" {
+ log.Error("The binding field must be specified")
+ rtrn_err = true
+ } else {
+ br.bindingField = config.Binding.Field
+ }
+
+
+ // This has already been validated bfore this function
+ // is called so just use it.
+ for idx := range(rTypeNames) {
+ if config.Type == rTypeNames[idx] {
+ br.routerType = idx
+ break
+ }
+ }
+
+ // Create the backend cluster or link to an existing one
+ ok := true
+ if br.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
+ if br.bkndClstr, err = NewBackendCluster(config.backendCluster); err != nil {
+ log.Errorf("Could not create a backend for router %s", config.Name)
+ rtrn_err = true
+ }
+ }
+
+ // HERE HERE HERE
+
+ if rtrn_err {
+ return br,errors.New(fmt.Sprintf("Failed to create a new router '%s'",br.name))
+ }
+
+
+ return br,nil
+}
diff --git a/afrouter/afrouter/codec.go b/afrouter/afrouter/codec.go
new file mode 100644
index 0000000..20bf354
--- /dev/null
+++ b/afrouter/afrouter/codec.go
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+import (
+ "fmt"
+ "sync"
+ "google.golang.org/grpc"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+)
+
+func Codec() grpc.Codec {
+ return CodecWithParent(&protoCodec{})
+}
+
+func CodecWithParent(fallback grpc.Codec) grpc.Codec {
+ return &rawCodec{fallback}
+}
+
+type rawCodec struct {
+ parentCodec grpc.Codec
+}
+
+type sbFrame struct {
+ payload []byte
+ router Router
+ method string
+ be *backend
+ lck sync.Mutex
+ metaKey string
+ metaVal string
+}
+
+type nbFrame struct {
+ payload []byte
+ router Router
+ be *backend
+ err error
+ mthdSlice []string
+ serNo chan uint64
+ metaKey string
+ metaVal string
+}
+
+func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {
+ switch t := v.(type) {
+ case *sbFrame:
+ return t.payload, nil
+ case *nbFrame:
+ return t.payload, nil
+ default:
+ return c.parentCodec.Marshal(v)
+ }
+
+}
+
+func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {
+ switch t := v.(type) {
+ case *sbFrame:
+ t.payload = data
+ // This is where the affinity is established on a northbound response
+ t.router.ReplyHandler(v)
+ return nil
+ case *nbFrame:
+ t.payload = data
+ // This is were the afinity value is pulled from the payload
+ // and the backend selected.
+ t.be = t.router.Route(v)
+ log.Debugf("Routing returned %v for method %s", t.be, t.mthdSlice[REQ_METHOD])
+ return nil
+ default:
+ return c.parentCodec.Unmarshal(data,v)
+ }
+}
+
+func (c *rawCodec) String() string {
+ return fmt.Sprintf("proxy>%s", c.parentCodec.String())
+}
+
+// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC.
+type protoCodec struct{}
+
+func (protoCodec) Marshal(v interface{}) ([]byte, error) {
+ return proto.Marshal(v.(proto.Message))
+}
+
+func (protoCodec) Unmarshal(data []byte, v interface{}) error {
+ return proto.Unmarshal(data, v.(proto.Message))
+}
+
+func (protoCodec) String() string {
+ return "proto"
+}
diff --git a/afrouter/afrouter/config.go b/afrouter/afrouter/config.go
new file mode 100644
index 0000000..761be2e
--- /dev/null
+++ b/afrouter/afrouter/config.go
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+// Command line parameters and parsing
+import (
+ "os"
+ "fmt"
+ "flag"
+ "path"
+ "errors"
+ "io/ioutil"
+ "encoding/json"
+ "github.com/opencord/voltha-go/common/log"
+)
+
+func ParseCmd() (*Configuration, error) {
+ config := &Configuration{}
+ cmdParse := flag.NewFlagSet(path.Base(os.Args[0]), flag.ContinueOnError);
+ config.ConfigFile = cmdParse.String("config", "arouter.json", "The configuration file for the affinity router")
+ config.LogLevel = cmdParse.Int("logLevel", 0, "The log level for the affinity router")
+ config.GrpcLog = cmdParse.Bool("grpclog", false, "Enable GRPC logging")
+
+ err := cmdParse.Parse(os.Args[1:]);
+ if(err != nil) {
+ //return err
+ return nil, errors.New("Error parsing the command line");
+ }
+ //if(!cmdParse.Parsed()) {
+ //}
+ return config, nil
+}
+
+// Configuration file loading and parsing
+type Configuration struct {
+ ConfigFile * string
+ LogLevel * int
+ GrpcLog * bool
+ Servers []ServerConfig `json:"servers"`
+ Ports PortConfig `json:"ports"`
+ ServerCertificates ServerCertConfig `json:"serverCertificates"`
+ ClientCertificates ClientCertConfig `json:"clientCertificates"`
+ BackendClusters []BackendClusterConfig `json:"backend_clusters"`
+ Routers []RouterConfig `json:"routers"`
+ Api ApiConfig
+}
+
+type RouterConfig struct {
+ Name string `json:"name"`
+ ProtoService string `json:"service"`
+ ProtoPackage string `json:"package"`
+ Routes []RouteConfig `json:"routes"`
+}
+
+type RouteConfig struct {
+ Name string `json:"name"`
+ Type string `json:"type"`
+ ProtoFile string `json:"proto_descriptor"`
+ Association string `json:"association"`
+ RouteField string `json:"routing_field"`
+ Methods []string `json:"methods"` // The GRPC methods to route using the route field
+ NbBindingMethods []string `json:"nb_binding_methods"`
+ BackendCluster string `json:"backend_cluster"`
+ Binding BindingConfig `json:"binding"`
+ Overrides []OverrideConfig `json:"overrides"`
+ backendCluster *BackendClusterConfig
+}
+
+type BindingConfig struct {
+ Type string `json:"type"`
+ Field string `json:"field"`
+ Method string `json:"method"`
+ Association string `json:"association"`
+
+}
+
+type OverrideConfig struct {
+ Methods []string `json:"methods"`
+ Method []string `json:"method"`
+ RouteField string `json:"routing_field"`
+}
+
+// Backend configuration
+
+type BackendClusterConfig struct {
+ Name string `json:"name"`
+ Backends []BackendConfig `json:"backends"`
+}
+
+type BackendConfig struct {
+ Name string `json:"name"`
+ Type string `json:"type"`
+ Association AssociationConfig `json:"association"`
+ Connections []ConnectionConfig `json:"connections"`
+}
+
+type AssociationConfig struct {
+ Strategy string `json:"strategy"`
+ Location string `json:"location"`
+ Field string `json:"field"`
+}
+
+type ConnectionConfig struct {
+ Name string `json:"name"`
+ Addr string `json:"addr"`
+ Port string `json:"port"`
+}
+
+// Server configuration
+
+type ServerConfig struct {
+ Name string `json:"name"`
+ Port uint `json:"port"`
+ Addr string `json:"address"`
+ Type string `json:"type"`
+ Routers []RouterPackage `json:"routers"`
+ routers map[string]*RouterConfig
+}
+
+type RouterPackage struct {
+ Router string `json:"router"`
+ Package string `json:"package"`
+}
+
+// Port configuration
+type PortConfig struct {
+ GrpcPort uint `json:"grpcPort"`
+ StreamingGrpcPort uint `json:"streamingGrpcPort"`
+ TlsGrpcPort uint `json:"tlsGrpcPort"`
+ TlsStreamingGrpcPort uint `json:"tlsStreamingGrpcPort"`
+ ControlPort uint `json:"controlPort"`
+}
+
+// Server Certificate configuration
+type ServerCertConfig struct {
+ GrpcCert string `json:"grpcCertificate"` // File path to the certificate file
+ GrpcKey string `json:"grpcKey"` // File path to the key file
+ GrpcCsr string `json:"grpcCsr"` // File path to the CSR file
+}
+
+// Client Certificate configuration
+type ClientCertConfig struct {
+ GrpcCert string `json:"grpcCertificate"` // File path to the certificate file
+ GrpcKey string `json:"grpcKey"` // File path to the key file
+ GrpcCsr string `json:"grpcCsr"` // File path to the CSR file
+}
+
+// Api configuration
+type ApiConfig struct {
+ Addr string `json:"address"`
+ Port uint `json:"port"`
+}
+
+func (conf * Configuration) LoadConfig() error {
+
+ configF, err := os.Open(*conf.ConfigFile);
+ log.Info("Loading configuration from: ", *conf.ConfigFile)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+
+ defer configF.Close()
+
+ configBytes, err := ioutil.ReadAll(configF)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+
+ json.Unmarshal(configBytes, conf)
+
+ // Now resolve references to different config objects in the
+ // config file. Currently there are 2 possible references
+ // to resolve: referecnes to routers in the servers, and
+ // references to backend_cluster in the routers.
+
+ // Resolve router references for the servers
+ log.Debug("Resolving references in the config file");
+ for k,_ := range(conf.Servers) {
+ //s.routers =make(map[string]*RouterConfig)
+ conf.Servers[k].routers = make(map[string]*RouterConfig)
+ for _,rPkg := range(conf.Servers[k].Routers) {
+ var found bool = false
+ // Locate the router "r" in the top lever Routers array
+ log.Debugf("Resolving router reference to router '%s' from server '%s'",rPkg.Router, conf.Servers[k].Name)
+ for rk, _ := range(conf.Routers) {
+ if conf.Routers[rk].Name == rPkg.Router && !found {
+ log.Debugf("Reference to router '%s' found for package '%s'", rPkg.Router, rPkg.Package)
+ conf.Servers[k].routers[rPkg.Package] = &conf.Routers[rk]
+ found = true
+ } else if conf.Routers[rk].Name == rPkg.Router && found {
+ if _,ok := conf.Servers[k].routers[rPkg.Package]; !ok {
+ log.Debugf("Reference to router '%s' found for package '%s'", rPkg.Router, rPkg.Package)
+ conf.Servers[k].routers[rPkg.Package] = &conf.Routers[rk]
+ } else {
+ err := errors.New(fmt.Sprintf("Duplicate router '%s' defined for package '%s'",rPkg.Package))
+ log.Error(err)
+ return err
+ }
+ }
+ }
+ if !found {
+ err := errors.New(fmt.Sprintf("Router %s for server %s not found in config", conf.Servers[k].Name, rPkg.Router))
+ log.Error(err)
+ return err
+ }
+ }
+ }
+
+ // Resolve backend references for the routers
+ for rk,rv := range(conf.Routers) {
+ for rtk,rtv := range(rv.Routes) {
+ var found bool = false
+ log.Debugf("Resolving backend reference to %s from router %s",rtv.BackendCluster, rv.Name)
+ for bek,bev := range(conf.BackendClusters) {
+ log.Debugf("Checking cluster %s", conf.BackendClusters[bek].Name)
+ if rtv.BackendCluster == bev.Name && !found {
+ conf.Routers[rk].Routes[rtk].backendCluster = &conf.BackendClusters[bek]
+ found = true
+ } else if rtv.BackendCluster == bev.Name && found {
+ err := errors.New(fmt.Sprintf("Duplicate backend defined, %s",conf.BackendClusters[bek].Name))
+ log.Error(err)
+ return err
+ }
+ }
+ if !found {
+ err := errors.New(fmt.Sprintf("Backend %s for router %s not found in config",
+ rtv.BackendCluster, rv.Name))
+ log.Error(err)
+ return err
+ }
+ }
+ }
+
+
+ return nil
+}
diff --git a/afrouter/afrouter/helpers.go b/afrouter/afrouter/helpers.go
new file mode 100644
index 0000000..04d5878
--- /dev/null
+++ b/afrouter/afrouter/helpers.go
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+func strIndex(ar []string, match string) int {
+ for idx := range(ar) {
+ if ar[idx] == match {
+ return idx
+ }
+ }
+ return 0
+}
diff --git a/afrouter/afrouter/method-router.go b/afrouter/afrouter/method-router.go
new file mode 100644
index 0000000..7ba175d
--- /dev/null
+++ b/afrouter/afrouter/method-router.go
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+import (
+ "fmt"
+ "errors"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+ "github.com/opencord/voltha-go/common/log"
+)
+
+const NoMeta = "nometa"
+
+type MethodRouter struct {
+ name string
+ service string
+ mthdRt map[string]map[string]Router // map of [metadata][method]
+}
+
+func newMethodRouter(config *RouterConfig) (Router, error) {
+ mr := MethodRouter{name:config.Name,service:config.ProtoService,mthdRt:make(map[string]map[string]Router)}
+ mr.mthdRt[NoMeta] = make(map[string]Router) // For routes not needing metadata (all expcept binding at this time)
+ if len(config.Routes) == 0 {
+ return nil, errors.New(fmt.Sprintf("Router %s must have at least one route", config.Name))
+ }
+ for _,rtv := range(config.Routes) {
+ var idx1 string
+ r,err := newRouter(config, &rtv)
+ if err != nil {
+ return nil, err
+ }
+ if rtv.Type == "binding" {
+ idx1 = rtv.Binding.Field
+ if _,ok := mr.mthdRt[idx1]; ok == false { // /First attempt on this key
+ mr.mthdRt[idx1] = make(map[string]Router)
+ }
+ } else {
+ idx1 = NoMeta
+ }
+ switch len(rtv.Methods) {
+ case 0:
+ return nil, errors.New(fmt.Sprintf("Route for router %s must have at least one method", config.Name))
+ case 1:
+ if rtv.Methods[0] == "*" {
+ return r, nil
+ } else {
+ log.Debugf("Setting router '%s' for single method '%s'",r.Name(),rtv.Methods[0])
+ if _,ok := mr.mthdRt[""][rtv.Methods[0]]; ok == false {
+ mr.mthdRt[idx1][rtv.Methods[0]] = r
+ } else {
+ err := errors.New(fmt.Sprintf("Attempt to define method %s for 2 routes: %s & %s", rtv.Methods[0],
+ r.Name(), mr.mthdRt[idx1][rtv.Methods[0]].Name()))
+ log.Debug(err)
+ return mr, err
+ }
+ }
+ default:
+ for _,m := range(rtv.Methods) {
+ log.Debugf("Setting router '%s' for method '%s'",r.Name(),m)
+ if _,ok := mr.mthdRt[idx1][m]; ok == false {
+ mr.mthdRt[idx1][m] = r
+ } else {
+ err := errors.New(fmt.Sprintf("Attempt to define method %s for 2 routes: %s & %s", rtv.Methods[0],
+ r.Name(), mr.mthdRt[idx1][m].Name()))
+ log.Debug(err)
+ return mr, err
+ }
+ }
+ }
+ }
+
+ return mr, nil
+}
+
+func (mr MethodRouter) Name() string {
+ return mr.name
+}
+
+func (mr MethodRouter) Service() string {
+ return mr.service
+}
+
+func (mr MethodRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error) {
+ var rtrnK string = NoMeta
+ var rtrnV string = ""
+
+ // Get the metadata from the server stream
+ md, ok := metadata.FromIncomingContext(serverStream.Context())
+ if !ok {
+ return rtrnK, rtrnV, errors.New("Could not get a server stream metadata")
+ }
+
+ // Determine if one of the method routing keys exists in the metadata
+ for k,_ := range(mr.mthdRt) {
+ if _,ok := md[k]; ok == true {
+ rtrnV = md[k][0]
+ rtrnK = k
+ break
+ }
+ }
+ return rtrnK,rtrnV,nil
+
+}
+
+func (mr MethodRouter) ReplyHandler(sel interface{}) error {
+ switch sl := sel.(type) {
+ case *sbFrame:
+ if r,ok := mr.mthdRt[NoMeta][sl.method]; ok == true {
+ return r.ReplyHandler(sel)
+ }
+ // TODO: this case should also be an error
+ default: //TODO: This should really be a big error
+ // A reply handler should only be called on the sbFrame
+ return nil
+ }
+ return nil
+}
+
+func (mr MethodRouter) Route(sel interface{}) *backend {
+ switch sl := sel.(type) {
+ case *nbFrame:
+ if r,ok := mr.mthdRt[sl.metaKey][sl.mthdSlice[REQ_METHOD]]; ok == true {
+ return r.Route(sel)
+ }
+ log.Errorf("Attept to route on non-existent method '%s'", sl.mthdSlice[REQ_METHOD])
+ return nil
+ default:
+ return nil
+ }
+ return nil
+}
+
+func (mr MethodRouter) BackendCluster(mthd string, metaKey string) (*backendCluster,error) {
+ if r,ok := mr.mthdRt[metaKey][mthd]; ok == true {
+ return r.BackendCluster(mthd, metaKey)
+ }
+ err := errors.New(fmt.Sprintf("No backend cluster exists for method '%s' using meta key '%s'", mthd,metaKey))
+ log.Error(err)
+ return nil, err
+}
+
+func (mr MethodRouter) FindBackendCluster(beName string) *backendCluster {
+ for _,meta := range mr.mthdRt {
+ for _,r := range meta {
+ if rtrn := r.FindBackendCluster(beName); rtrn != nil {
+ return rtrn
+ }
+ }
+ }
+ return nil
+}
diff --git a/afrouter/afrouter/round-robin-router.go b/afrouter/afrouter/round-robin-router.go
new file mode 100644
index 0000000..2ab8421
--- /dev/null
+++ b/afrouter/afrouter/round-robin-router.go
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+import (
+ "fmt"
+ "errors"
+ "google.golang.org/grpc"
+ "github.com/opencord/voltha-go/common/log"
+)
+
+type RoundRobinRouter struct {
+ name string
+ routerType int // TODO: Likely not needed.
+ grpcService string
+ bkndClstr *backendCluster
+ curBknd **backend
+}
+
+func NewRoundRobinRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+ var err error = nil
+ var rtrn_err bool = false
+ // Validate the configuration
+
+ log.Debug("Creating a new round robin router")
+ // A name must exist
+ if config.Name == "" {
+ log.Error("A router 'name' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoPackage == "" {
+ log.Error("A 'package' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoService == "" {
+ log.Error("A 'service' must be specified")
+ rtrn_err = true
+ }
+
+ var bptr *backend
+ bptr = nil
+ rr := RoundRobinRouter{
+ name:config.Name,
+ grpcService:rconf.ProtoService,
+ curBknd:&bptr,
+ }
+
+ // This has already been validated bfore this function
+ // is called so just use it.
+ for idx := range(rTypeNames) {
+ if config.Type == rTypeNames[idx] {
+ rr.routerType = idx
+ break
+ }
+ }
+
+ // Create the backend cluster or link to an existing one
+ ok := true
+ if rr.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
+ if rr.bkndClstr, err = NewBackendCluster(config.backendCluster); err != nil {
+ log.Errorf("Could not create a backend for router %s", config.Name)
+ rtrn_err = true
+ }
+ }
+
+ if rtrn_err {
+ return rr,errors.New(fmt.Sprintf("Failed to create a new router '%s'",rr.name))
+ }
+
+ return rr,nil
+}
+
+func (rr RoundRobinRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error) {
+ return "","",nil
+}
+
+func (rr RoundRobinRouter) BackendCluster(s string, mk string) (*backendCluster,error) {
+ return rr.bkndClstr, nil
+}
+
+func (rr RoundRobinRouter) Name() (string) {
+ return rr.name
+}
+
+func(rr RoundRobinRouter) Route(sel interface{}) (*backend) {
+ var err error
+ switch sl := sel.(type) {
+ case *nbFrame:
+ // Since this is a round robin router just get the next backend
+ if *rr.curBknd, err = rr.bkndClstr.nextBackend(*rr.curBknd,BE_SEQ_RR); err == nil {
+ return *rr.curBknd
+ } else {
+ sl.err = err
+ return nil
+ }
+ default:
+ log.Errorf("Internal: invalid data type in Route call %v", sel);
+ return nil
+ }
+ log.Errorf("Round robin error %v", err);
+ return nil
+}
+
+func (rr RoundRobinRouter) Service() (string) {
+ return rr.grpcService
+}
+
+func (rr RoundRobinRouter) FindBackendCluster(string) (*backendCluster) {
+ return rr.bkndClstr
+}
+
+func (rr RoundRobinRouter) ReplyHandler(sel interface{}) error { // This is a no-op
+ return nil
+}
diff --git a/afrouter/afrouter/router.go b/afrouter/afrouter/router.go
new file mode 100644
index 0000000..950ccc2
--- /dev/null
+++ b/afrouter/afrouter/router.go
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+import (
+ "fmt"
+ "errors"
+ "google.golang.org/grpc"
+)
+
+const (
+ RT_RPC_AFFINITY_MESSAGE = iota+1
+ RT_RPC_AFFINITY_HEADER = iota+1
+ RT_BINDING = iota+1
+ RT_ROUND_ROBIN = iota+1
+)
+
+// String names for display in error messages.
+var rTypeNames = []string{"","rpc_affinity_message","rpc_affinity_header","binding", "round_robin"}
+var rAssnNames = []string{"","round_robin"}
+
+var allRouters map[string]Router = make(map[string]Router)
+
+// The router interface
+type Router interface {
+ Name() (string)
+ Route(interface{}) *backend
+ Service() (string)
+ BackendCluster(string, string) (*backendCluster, error)
+ FindBackendCluster(string) (*backendCluster)
+ ReplyHandler(interface{}) error
+ GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error)
+}
+
+func NewRouter(config *RouterConfig) (Router, error) {
+ r,err := newMethodRouter(config)
+ if err == nil {
+ allRouters[r.Name()] = r
+ }
+ return r, err
+}
+
+func newRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+ idx := strIndex(rTypeNames, config.Type)
+ //for idx := range(rTypeNames) {
+ //if config.Type == rTypeNames[idx] {
+ switch idx {
+ case RT_RPC_AFFINITY_MESSAGE:
+ r,err := NewAffinityRouter(rconf, config)
+ if err == nil {
+ allRouters[rconf.Name+config.Name] = r
+ }
+ return r, err
+ case RT_BINDING:
+ r,err := NewBindingRouter(rconf, config)
+ if err == nil {
+ allRouters[rconf.Name+config.Name] = r
+ }
+ return r, err
+ case RT_ROUND_ROBIN:
+ r,err := NewRoundRobinRouter(rconf, config)
+ if err == nil {
+ allRouters[rconf.Name+config.Name] = r
+ }
+ return r, err
+ default:
+ return nil, errors.New(fmt.Sprintf("Internal error, undefined router type: %s", config.Type))
+ }
+
+ //}
+ //}
+ return nil, errors.New(fmt.Sprintf("Unrecognized router type '%s'",config.Type))
+}
diff --git a/afrouter/afrouter/server.go b/afrouter/afrouter/server.go
new file mode 100644
index 0000000..3b729f9
--- /dev/null
+++ b/afrouter/afrouter/server.go
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package afrouter
+
+import (
+ "fmt"
+ "net"
+ "regexp"
+ "errors"
+ "strconv"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "github.com/opencord/voltha-go/common/log"
+)
+
+var (
+ clientStreamDescForProxying = &grpc.StreamDesc{
+ ServerStreams: true,
+ ClientStreams: true,
+ }
+)
+const (
+ REQ_ALL = 0
+ REQ_PACKAGE = 1
+ REQ_SERVICE = 2
+ REQ_METHOD = 3
+)
+
+type server struct {
+ running bool
+ name string
+ stype nbi
+ proxyListener net.Listener
+ routers map[string]map[string]Router
+ proxyServer *grpc.Server
+}
+
+type nbRequest struct {
+ srv interface{}
+ serverStream grpc.ServerStream
+ r Router
+ mthdSlice []string
+ metaKey string // There should be at most one key specified. More than one is an error.
+ metaVal string // This is the value extracted from the meta key if it exists or "" otherwise
+}
+
+var mthdSlicerExp string = `^/([a-zA-Z][a-zA-Z0-9]+)\.([a-zA-Z][a-zA-Z0-9]+)/([a-zA-Z][a-zA-Z0-9]+)`
+var mthdSlicer *regexp.Regexp // The compiled regex to extract the package/service/method
+
+
+func NewServer(config *ServerConfig) (*server,error) {
+ var err error = nil
+ var rtrn_err bool = false
+ var srvr *server
+ // Change over to the new configuration format
+ // Validate the configuration
+ // There should be a name
+ if config.Name == "" {
+ log.Error("A server has been defined with no name")
+ rtrn_err = true
+ }
+ // Validate that there's a port specified
+ if config.Port == 0 {
+ log.Errorf("Server %s does not have a valid port assigned", config.Name)
+ rtrn_err = true
+ }
+ // Validate the ip address if one is provided
+ if ip := net.ParseIP(config.Addr); config.Addr != "" && ip == nil {
+ log.Errorf("Invalid address '%s' provided for server '%s'", config.Addr, config.Name)
+ rtrn_err = true
+ }
+ if config.Type != "grpc" && config.Type != "streaming_grpc" {
+ if config.Type == "" {
+ log.Errorf("A server 'type' must be defined for server %s",config.Name)
+ } else {
+ log.Errorf("The server type must be either 'grpc' or 'streaming_grpc' "+
+ "but '%s' was found for server '%s'", config.Type, config.Name)
+ }
+ rtrn_err = true
+ }
+ if len(config.Routers) == 0 {
+ log.Errorf("At least one router must be specified for server '%s'", config.Name)
+ rtrn_err = true
+ }
+
+ if rtrn_err == true {
+ return nil, errors.New("Server configuration failed")
+ } else {
+ // The configuration is valid, create a server and configure it.
+ srvr = &server{name:config.Name,routers:make(map[string]map[string]Router)}
+ // The listener
+ if srvr.proxyListener, err =
+ net.Listen("tcp", config.Addr + ":"+
+ strconv.Itoa(int(config.Port))); err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ // Create the routers
+ log.Debugf("Configuring the routers for server %s", srvr.name)
+ for p,r := range(config.routers) {
+ log.Debugf("Processing router %s for package %s", r.Name,p)
+ if dr,err := NewRouter(r); err != nil {
+ log.Error(err)
+ return nil, err
+ } else {
+ log.Debugf("Adding router %s to the server %s for package %s and service %s",
+ dr.Name(), srvr.name, p, dr.Service())
+ if _,ok := srvr.routers[p]; ok {
+ srvr.routers[p][dr.Service()] = dr
+ } else {
+ srvr.routers[p] = make(map[string]Router)
+ srvr.routers[p][dr.Service()] = dr
+ }
+ }
+ }
+ // Configure the grpc handler
+ srvr.proxyServer = grpc.NewServer(
+ grpc.CustomCodec(Codec()),
+ grpc.UnknownServiceHandler(srvr.TransparentHandler()),
+ )
+
+ }
+ // Compile the regular expression to extract the method
+ mthdSlicer = regexp.MustCompile(mthdSlicerExp)
+ return srvr, nil
+}
+
+func (s *server) Name() (string) {
+ return s.name
+}
+
+func (s *server) TransparentHandler() grpc.StreamHandler {
+ return s.handler
+}
+
+
+func (s *server) getRouter(pkg *string, service *string) (Router,bool) {
+ if fn, ok := s.routers[*pkg][*service]; ok { // Both specified
+ return fn, ok
+ } else if fn, ok = s.routers["*"][*service]; ok { // Package wild card
+ return fn, ok
+ } else if fn, ok = s.routers[*pkg]["*"]; ok { // Service wild card
+ return fn, ok
+ } else if fn, ok = s.routers["*"]["*"]; ok { // Both Wildcarded
+ return fn, ok
+ } else {
+ return nil,false
+ }
+}
+
+
+func (s *server) handler(srv interface{}, serverStream grpc.ServerStream) error {
+ // Determine what router is intended to handle this request
+ fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
+ if !ok {
+ return grpc.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
+ }
+ log.Debugf("Processing grpc request %s on server %s",fullMethodName,s.name)
+ // The full method name is structured as follows:
+ // <package name>.<service>/<method>
+ mthdSlice := mthdSlicer.FindStringSubmatch(fullMethodName)
+ if mthdSlice == nil {
+ log.Errorf("Faled to slice full method %s, result: %v", fullMethodName, mthdSlice)
+ } else {
+ log.Debugf("Sliced full method %s: %v", fullMethodName, mthdSlice)
+ }
+ r, ok := s.getRouter(&mthdSlice[REQ_PACKAGE],&mthdSlice[REQ_SERVICE])
+ //fn, ok := s.routers[mthdSlice[REQ_PACKAGE]][mthdSlice[REQ_SERVICE]]
+ if !ok {
+ // TODO: Should this be punted to a default transparent router??
+ // Probably not, if one is defined yes otherwise just crap out.
+
+ err := errors.New(
+ fmt.Sprintf("Unable to dispatch! Service '%s' for package '%s' not found.",
+ mthdSlice[REQ_SERVICE], mthdSlice[REQ_PACKAGE]))
+ log.Error(err)
+ return err
+ }
+ log.Debugf("Selected router %s\n", r.Name())
+
+ mk,mv,err := r.GetMetaKeyVal(serverStream)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+
+ //nbR := &nbRequest(srv:srv,serverStream:serverStream,r:r,mthdSlice:mthdSlice,metaKey:mk,metaVal:mv)
+
+ // Extract the cluster from the selected router and use it to manage the transfer
+ if bkndClstr,err := r.BackendCluster(mthdSlice[REQ_METHOD], mk); err != nil {
+ return err
+ } else {
+ //return bkndClstr.handler(nbR)
+ return bkndClstr.handler(srv, serverStream, r, mthdSlice, mk, mv)
+ }
+
+ return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
+}
+
diff --git a/afrouter/afrouter/signals.go b/afrouter/afrouter/signals.go
new file mode 100644
index 0000000..c550842
--- /dev/null
+++ b/afrouter/afrouter/signals.go
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+// This file implements an exit handler that tries to shut down all the
+// running servers before finally exiting. There are 2 triggers to this
+// clean exit thread: signals and an exit channel.
+
+package afrouter
+
+import (
+ "os"
+ "syscall"
+ "os/signal"
+ "github.com/opencord/voltha-go/common/log"
+)
+
+var errChan = make(chan error)
+var doneChan = make(chan error)
+var holdChan = make(chan int)
+
+
+func InitExitHandler() error {
+
+ // Start the signal handler
+ go signalHandler()
+ // Start the error handler
+ go errHandler()
+
+ return nil
+}
+
+func signalHandler() {
+ // Make signal channel and register notifiers for Interupt and Terminate
+ sigchan := make(chan os.Signal, 1)
+ signal.Notify(sigchan, os.Interrupt)
+ signal.Notify(sigchan, syscall.SIGTERM)
+
+ // Block until we receive a signal on the channel
+ <-sigchan
+
+ log.Info("shutting down on signal as requested")
+
+ cleanExit(nil)
+}
+
+func errHandler() {
+
+ err := <-errChan
+
+ cleanExit(err)
+}
+
+func cleanExit(err error) {
+ // Log the shutdown
+ if arProxy != nil {
+ for _, srvr := range arProxy.servers {
+ if srvr.running {
+ log.With(log.Fields{"server":srvr.name}).Debug("Closing server")
+ srvr.proxyServer.GracefulStop();
+ srvr.proxyListener.Close();
+ }
+ }
+ }
+ for _,cl := range(bClusters) {
+ for _, bknd := range(cl.backends) {
+ log.Debugf("Closing backend %s", bknd.name)
+ for _,conn := range(bknd.connections) {
+ log.Debugf("Closing connection %s", conn.name)
+ conn.close()
+ }
+ }
+ }
+ doneChan <- err
+ //os.Exit(0)
+}
+
diff --git a/afrouter/arouter.go b/afrouter/arouter.go
new file mode 100644
index 0000000..b25ff42
--- /dev/null
+++ b/afrouter/arouter.go
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// gRPC affinity router with active/active backends
+
+package main
+/* package main // import "github.com/opencord/voltha-go/arouter" */
+/* package main // import "github.com/opencord/voltha-go" */
+
+
+import (
+ "os"
+ "fmt"
+ slog "log"
+ "google.golang.org/grpc/grpclog"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/afrouter/afrouter"
+)
+
+func main() {
+
+
+ conf,err := afrouter.ParseCmd()
+ if(err != nil) {
+ fmt.Printf("Error: %v\n", err)
+ return
+ }
+
+ // Setup logging
+ if _, err := log.SetDefaultLogger(log.JSON, *conf.LogLevel, nil); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ defer log.CleanUp()
+
+ // Parse the config file
+ err = conf.LoadConfig()
+ if(err != nil) {
+ log.Error(err)
+ return
+ }
+ log.With(log.Fields{"config":*conf}).Debug("Configuration loaded")
+
+ // Enable grpc logging
+ if *conf.GrpcLog {
+ grpclog.SetLogger(slog.New(os.Stderr, "grpc: ", slog.LstdFlags))
+ //grpclog.SetLoggerV2(lgr)
+ }
+
+ // Install the signal and error handlers.
+ afrouter.InitExitHandler()
+
+
+ // Create the affinity router proxy...
+ if ap,err := afrouter.NewArouterProxy(conf); err != nil {
+ log.Errorf("Failed to create the arouter proxy, exiting:%v",err)
+ return
+ // and start it.
+ // This function never returns unless an error
+ // occurs or a signal is caught.
+ } else if err := ap.ListenAndServe(); err != nil {
+ log.Errorf("Exiting on error %v", err)
+ }
+
+}
diff --git a/afrouter/arouter.json b/afrouter/arouter.json
new file mode 100644
index 0000000..50e1d31
--- /dev/null
+++ b/afrouter/arouter.json
@@ -0,0 +1,184 @@
+{
+ "servers": [
+ {
+ "name": "grpc_command",
+ "port": 55555,
+ "address":"",
+ "type": "grpc",
+ "routers": [
+ {
+ "_TODO":"Suport a router list, remove the package and service from the router",
+ "package":"voltha",
+ "service":"VolthaService",
+ "router":"vcore"
+ }
+ ]
+ }
+ ],
+ "routers": [
+ {
+ "name":"vcore",
+ "package": "voltha",
+ "service": "VolthaService",
+ "routes": [
+ {
+ "name":"dev_manager",
+ "proto_descriptor":"voltha.pb",
+ "type":"rpc_affinity_message",
+ "association":"round_robin",
+ "routing_field": "id",
+ "backend_cluster":"vcore",
+ "_COMMENT":"Methods are naturally southbound affinity binding unless otherwise specified below",
+ "methods":[ "CreateDevice",
+ "GetCoreInstance",
+ "EnableLogicalDevicePort",
+ "DisableLogicalDevicePort",
+ "EnableDevice",
+ "DisableDevice",
+ "RebootDevice",
+ "DeleteDevice",
+ "DownloadImage",
+ "CancelImageDownload",
+ "ActivateImageUpdate",
+ "RevertImageUpdate",
+ "UpdateDevicePmConfigs",
+ "CreateAlarmFilter",
+ "UpdateAlarmFilter",
+ "DeleteAlarmFilter",
+ "SelfTest"],
+ "_COMMENT":"If a method is northbound affinity binding then association is used to route",
+ "_COMMENT":"but affinity is not set southbound but only based on the response",
+ "_COMMENT":"Methods here MUST be specified above, this overrides thier default beahvior",
+ "nb_binding_methods":["CreateDevice"],
+ "_TODO":"Overrides not implemented yet, config ignored",
+ "overrides": [
+ {"methods":["abc","def"], "routing_field":"id"},
+ {"methods":["ghi","jkl"]},
+ {"method":"mno", "routing_field":"id"},
+ {"method":"pqr"}
+ ]
+ },
+ {
+ "name":"read_only",
+ "type":"round_robin",
+ "association":"round_robin",
+ "backend_cluster":"vcore",
+ "methods":[ "ListDevicePorts",
+ "ListDevicePmConfigs",
+ "GetImages",
+ "GetImageDownloadStatus",
+ "GetImageDownload",
+ "ListImageDownloads",
+ "ListDeviceFlows",
+ "ListDeviceFlowGroups",
+ "ListLogicalDeviceFlows",
+ "ListLogicalDeviceFlowGroups",
+ "ListDevices",
+ "GetDevice",
+ "GetDeviceType",
+ "GetDeviceGroup",
+ "GetLogicalDevice",
+ "GetAlarmFilter",
+ "ListLogicalDevicePorts",
+ "GetLogicalDevicePort"
+ ]
+ },
+ {
+ "name":"dev_manager_ofagent",
+ "type":"binding",
+ "_association":"round_robin",
+ "binding": {
+ "type":"header",
+ "field":"voltha_backend_name",
+ "method":"Subscribe",
+ "association":"round_robin"
+ },
+ "backend_cluster":"vcore",
+ "methods":["StreamPacketsOut",
+ "Subscribe",
+ "ListLogicalDevices",
+ "ListDeviceFlowGroups",
+ "ListLogicalDeviceFlowGroups",
+ "ListDeviceFlows",
+ "UpdateLogicalDeviceFlowTable",
+ "UpdateLogicalDeviceFlowGroupTable",
+ "ListLogicalDeviceFlows"
+ ],
+ "_TODO":"Overrides not implemented yet, config ignored",
+ "overrides": [
+ {"methods":["abc","def"], "routing_field":"id"},
+ {"methods":["ghi","jkl"]},
+ {"method":"mno", "routing_field":"id"},
+ {"method":"pqr"}
+ ]
+ }
+ ]
+ }
+ ],
+ "backend_clusters": [
+ {
+ "name":"vcore",
+ "backends":[ {
+ "name":"vcore1",
+ "type":"active_active",
+ "association": {
+ "strategy":"serial_number",
+ "location":"header",
+ "_TODO":"The key below needs to be implemented, currently hard coded",
+ "key":"voltha_serial_number"
+ },
+ "connections": [ {
+ "name":"vcore11",
+ "addr":"",
+ "port":""
+ },
+ {
+ "name":"vcore12",
+ "addr":"",
+ "port":""
+ }]
+ },
+ {
+ "name":"vcore2",
+ "type":"active_active",
+ "association": {
+ "strategy":"serial_number",
+ "location":"header"
+ },
+ "connections": [ {
+ "name":"vcore21",
+ "addr":"",
+ "port":""
+ },
+ {
+ "name":"vcore22",
+ "addr":"",
+ "port":""
+ }]
+ },
+ {
+ "name":"vcore3",
+ "type":"active_active",
+ "association": {
+ "strategy":"serial_number",
+ "location":"header"
+ },
+ "connections": [ {
+ "name":"vcore31",
+ "addr":"",
+ "port":""
+ },
+ {
+ "name":"vcore32",
+ "addr":"",
+ "port":""
+ }]
+ }]
+ }
+ ],
+ "api": {
+ "_comment":"If this isn't defined then no api is available for dynamic configuration and queries",
+ "address":"",
+ "port":55554
+ }
+}