VOL-1577 - General cleanup (gofmt, go vet, go test, dep check)
Change-Id: I536b2746b8bd266f3e75aeccc65bfe7468f1b44a
diff --git a/afrouter/afrouter/affinity-router.go b/afrouter/afrouter/affinity-router.go
index 30d1982..45ec26a 100644
--- a/afrouter/afrouter/affinity-router.go
+++ b/afrouter/afrouter/affinity-router.go
@@ -18,37 +18,37 @@
package afrouter
import (
- "fmt"
"errors"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ pb "github.com/golang/protobuf/protoc-gen-go/descriptor"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "io/ioutil"
"regexp"
"strconv"
- "io/ioutil"
- "google.golang.org/grpc"
- "github.com/golang/protobuf/proto"
- "github.com/opencord/voltha-go/common/log"
- pb "github.com/golang/protobuf/protoc-gen-go/descriptor"
)
const (
- PKG_MTHD_PKG int = 1
+ PKG_MTHD_PKG int = 1
PKG_MTHD_MTHD int = 2
)
type AffinityRouter struct {
- name string
- routerType int // TODO: This is probably not needed
- association int
- routingField string
- grpcService string
- protoDescriptor *pb.FileDescriptorSet
- methodMap map[string]byte
+ name string
+ routerType int // TODO: This is probably not needed
+ association int
+ routingField string
+ grpcService string
+ protoDescriptor *pb.FileDescriptorSet
+ methodMap map[string]byte
nbBindingMthdMap map[string]byte
- bkndClstr *backendCluster
- affinity map[string]*backend
- curBknd **backend
+ bkndClstr *backendCluster
+ affinity map[string]*backend
+ curBknd **backend
}
-func newAffinityRouter(rconf *RouterConfig, config *RouteConfig) (Router,error) {
+func newAffinityRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
var err error = nil
var rtrn_err bool = false
var pkg_re *regexp.Regexp = regexp.MustCompile(`^(\.[^.]+\.)(.+)$`)
@@ -80,28 +80,27 @@
// routing_field. This needs to be added so that methods
// can have different routing fields.
var bptr *backend
- bptr = nil
+ bptr = nil
dr := AffinityRouter{
- name:config.Name,
- grpcService:rconf.ProtoService,
- affinity:make(map[string]*backend),
- methodMap:make(map[string]byte),
- nbBindingMthdMap:make(map[string]byte),
- curBknd:&bptr,
+ name: config.Name,
+ grpcService: rconf.ProtoService,
+ affinity: make(map[string]*backend),
+ methodMap: make(map[string]byte),
+ nbBindingMthdMap: make(map[string]byte),
+ curBknd: &bptr,
//serialNo:0,
}
// An association must exist
dr.association = strIndex(rAssnNames, config.Association)
if dr.association == 0 {
if config.Association == "" {
- log.Error("An association must be specified")
+ log.Error("An association must be specified")
} else {
- log.Errorf("The association '%s' is not valid", config.Association)
+ log.Errorf("The association '%s' is not valid", config.Association)
}
rtrn_err = true
}
-
// This has already been validated bfore this function
// is called so just use it.
for idx := range rTypeNames {
@@ -113,9 +112,9 @@
// Load the protobuf descriptor file
dr.protoDescriptor = &pb.FileDescriptorSet{}
- fb, err := ioutil.ReadFile(config.ProtoFile);
+ fb, err := ioutil.ReadFile(config.ProtoFile)
if err != nil {
- log.Errorf("Could not open proto file '%s'",config.ProtoFile)
+ log.Errorf("Could not open proto file '%s'", config.ProtoFile)
rtrn_err = true
}
err = proto.Unmarshal(fb, dr.protoDescriptor)
@@ -124,42 +123,41 @@
rtrn_err = true
}
-
// Build the routing structure based on the loaded protobuf
// descriptor file and the config information.
type key struct {
- mthd string
+ mthd string
field string
}
var msgs map[key]byte = make(map[key]byte)
- for _,f := range dr.protoDescriptor.File {
+ for _, f := range dr.protoDescriptor.File {
// Build a temporary map of message types by name.
- for _,m := range f.MessageType {
- for _,fld := range m.Field {
+ for _, m := range f.MessageType {
+ for _, fld := range m.Field {
log.Debugf("Processing message '%s', field '%s'", *m.Name, *fld.Name)
msgs[key{*m.Name, *fld.Name}] = byte(*fld.Number)
}
}
}
log.Debugf("The map contains: %v", msgs)
- for _,f := range dr.protoDescriptor.File {
+ for _, f := range dr.protoDescriptor.File {
if *f.Package == rconf.ProtoPackage {
- for _, s:= range f.Service {
+ for _, s := range f.Service {
if *s.Name == rconf.ProtoService {
log.Debugf("Loading package data '%s' for service '%s' for router '%s'", *f.Package, *s.Name, dr.name)
// Now create a map keyed by method name with the value being the
// field number of the route selector.
var ok bool
- for _,m := range s.Method {
+ for _, m := range s.Method {
// Find the input type in the messages and extract the
// field number and save it for future reference.
- log.Debugf("Processing method '%s'",*m.Name)
+ log.Debugf("Processing method '%s'", *m.Name)
// Determine if this is a method we're supposed to be processing.
if needMethod(*m.Name, config) == true {
- log.Debugf("Enabling method '%s'",*m.Name)
+ log.Debugf("Enabling method '%s'", *m.Name)
pkg_methd := pkg_re.FindStringSubmatch(*m.InputType)
if pkg_methd == nil {
- log.Errorf("Regular expression didn't match input type '%s'",*m.InputType)
+ log.Errorf("Regular expression didn't match input type '%s'", *m.InputType)
rtrn_err = true
}
// The input type has the package name prepended to it. Remove it.
@@ -168,19 +166,19 @@
dr.methodMap[*m.Name], ok = msgs[key{in, config.RouteField}]
if ok == false {
log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
- *m.Name, config.RouteField, in)
+ *m.Name, config.RouteField, in)
rtrn_err = true
}
}
// The sb method is always included in the methods so we can check it here too.
if needSbMethod(*m.Name, config) == true {
- log.Debugf("Enabling southbound method '%s'",*m.Name)
+ log.Debugf("Enabling southbound method '%s'", *m.Name)
// The output type has the package name prepended to it. Remove it.
out := (*m.OutputType)[len(rconf.ProtoPackage)+2:]
dr.nbBindingMthdMap[*m.Name], ok = msgs[key{out, config.RouteField}]
if ok == false {
log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
- *m.Name, config.RouteField, out)
+ *m.Name, config.RouteField, out)
rtrn_err = true
}
}
@@ -190,8 +188,7 @@
}
}
-
- // Create the backend cluster or link to an existing one
+ // Create the backend cluster or link to an existing one
ok := true
if dr.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
if dr.bkndClstr, err = newBackendCluster(config.backendCluster); err != nil {
@@ -201,14 +198,14 @@
}
if rtrn_err {
- return dr,errors.New(fmt.Sprintf("Failed to create a new router '%s'",dr.name))
+ return dr, errors.New(fmt.Sprintf("Failed to create a new router '%s'", dr.name))
}
- return dr,nil
+ return dr, nil
}
func needSbMethod(mthd string, conf *RouteConfig) bool {
- for _,m := range conf.NbBindingMethods {
+ for _, m := range conf.NbBindingMethods {
if mthd == m {
return true
}
@@ -217,7 +214,7 @@
}
func needMethod(mthd string, conf *RouteConfig) bool {
- for _,m := range conf.Methods {
+ for _, m := range conf.Methods {
if mthd == m {
return true
}
@@ -225,36 +222,38 @@
return false
}
-func (r AffinityRouter) Service() (string) {
+func (r AffinityRouter) Service() string {
return r.grpcService
}
-func (r AffinityRouter) Name() (string) {
+func (r AffinityRouter) Name() string {
return r.name
}
-func (r AffinityRouter) skipField(data *[]byte, idx *int) (error) {
- switch (*data)[*idx]&3 {
- case 0: // Varint
+func (r AffinityRouter) skipField(data *[]byte, idx *int) error {
+ switch (*data)[*idx] & 3 {
+ case 0: // Varint
(*idx)++
- for (*data)[*idx] >= 128 { (*idx)++}
- case 1: // 64 bit
- (*idx)+= 9
- case 2: // Length delimited
+ for (*data)[*idx] >= 128 {
(*idx)++
- b := proto.NewBuffer((*data)[*idx:])
- t , _ := b.DecodeVarint()
- (*idx) += int(t)+1
- case 3: // Deprecated
- case 4: // Deprecated
- case 5: // 32 bit
- (*idx)+= 5
+ }
+ case 1: // 64 bit
+ (*idx) += 9
+ case 2: // Length delimited
+ (*idx)++
+ b := proto.NewBuffer((*data)[*idx:])
+ t, _ := b.DecodeVarint()
+ (*idx) += int(t) + 1
+ case 3: // Deprecated
+ case 4: // Deprecated
+ case 5: // 32 bit
+ (*idx) += 5
}
return nil
}
func (r AffinityRouter) decodeProtoField(payload []byte, fieldId byte) (string, error) {
- idx :=0
+ idx := 0
b := proto.NewBuffer([]byte{})
//b.DebugPrint("The Buffer", payload)
for { // Find the route selector field
@@ -265,31 +264,31 @@
// TODO: Consider supporting other selector types.... Way, way in the future
// ok, the future is now, support strings as well... ugh.
var selector string
- switch payload[idx]&3 {
- case 0: // Integer
- b.SetBuf(payload[idx+1:])
- v,e := b.DecodeVarint()
- if e == nil {
- log.Debugf("Decoded the ing field: %v", v)
- selector = strconv.Itoa(int(v))
- } else {
- log.Errorf("Failed to decode varint %v", e)
- return "", e
- }
- case 2: // Length delimited AKA string
- b.SetBuf(payload[idx+1:])
- v,e := b.DecodeStringBytes()
- if e == nil {
- log.Debugf("Decoded the string field: %v", v)
- selector = v
- } else {
- log.Errorf("Failed to decode string %v", e)
- return "", e
- }
- default:
- err := errors.New(fmt.Sprintf("Only integer and string route selectors are permitted"))
- log.Error(err)
- return "", err
+ switch payload[idx] & 3 {
+ case 0: // Integer
+ b.SetBuf(payload[idx+1:])
+ v, e := b.DecodeVarint()
+ if e == nil {
+ log.Debugf("Decoded the ing field: %v", v)
+ selector = strconv.Itoa(int(v))
+ } else {
+ log.Errorf("Failed to decode varint %v", e)
+ return "", e
+ }
+ case 2: // Length delimited AKA string
+ b.SetBuf(payload[idx+1:])
+ v, e := b.DecodeStringBytes()
+ if e == nil {
+ log.Debugf("Decoded the string field: %v", v)
+ selector = v
+ } else {
+ log.Errorf("Failed to decode string %v", e)
+ return "", e
+ }
+ default:
+ err := errors.New(fmt.Sprintf("Only integer and string route selectors are permitted"))
+ log.Error(err)
+ return "", err
}
return selector, nil
} else if err := r.skipField(&payload, &idx); err != nil {
@@ -301,54 +300,54 @@
func (r AffinityRouter) Route(sel interface{}) *backend {
switch sl := sel.(type) {
- case *nbFrame:
- log.Debugf("Route called for nbFrame with method %s", sl.mthdSlice[REQ_METHOD]);
- // Check if this method should be affinity bound from the
- // reply rather than the request.
- if _,ok := r.nbBindingMthdMap[sl.mthdSlice[REQ_METHOD]]; ok == true {
+ case *nbFrame:
+ log.Debugf("Route called for nbFrame with method %s", sl.mthdSlice[REQ_METHOD])
+ // Check if this method should be affinity bound from the
+ // reply rather than the request.
+ if _, ok := r.nbBindingMthdMap[sl.mthdSlice[REQ_METHOD]]; ok == true {
+ var err error
+ log.Debugf("Method '%s' affinity binds on reply", sl.mthdSlice[REQ_METHOD])
+ // Just round robin route the southbound request
+ if *r.curBknd, err = r.bkndClstr.nextBackend(*r.curBknd, BE_SEQ_RR); err == nil {
+ return *r.curBknd
+ } else {
+ sl.err = err
+ return nil
+ }
+ }
+ // Not a south affinity binding method, proceed with north affinity binding.
+ if selector, err := r.decodeProtoField(sl.payload, r.methodMap[sl.mthdSlice[REQ_METHOD]]); err == nil {
+ log.Debugf("Establishing affinity for selector: %s", selector)
+ if rtrn, ok := r.affinity[selector]; ok {
+ return rtrn
+ } else {
+ // The selector isn't in the map, create a new affinity mapping
+ log.Debugf("MUST CREATE A NEW AFFINITY MAP ENTRY!!")
var err error
- log.Debugf("Method '%s' affinity binds on reply", sl.mthdSlice[REQ_METHOD])
- // Just round robin route the southbound request
- if *r.curBknd, err = r.bkndClstr.nextBackend(*r.curBknd,BE_SEQ_RR); err == nil {
+ if *r.curBknd, err = r.bkndClstr.nextBackend(*r.curBknd, BE_SEQ_RR); err == nil {
+ r.setAffinity(selector, *r.curBknd)
+ //r.affinity[selector] = *r.curBknd
+ //log.Debugf("New affinity set to backend %s",(*r.curBknd).name)
return *r.curBknd
} else {
sl.err = err
return nil
}
}
- // Not a south affinity binding method, proceed with north affinity binding.
- if selector,err := r.decodeProtoField(sl.payload, r.methodMap[sl.mthdSlice[REQ_METHOD]]); err == nil {
- log.Debugf("Establishing affinity for selector: %s", selector)
- if rtrn,ok := r.affinity[selector]; ok {
- return rtrn
- } else {
- // The selector isn't in the map, create a new affinity mapping
- log.Debugf("MUST CREATE A NEW AFFINITY MAP ENTRY!!")
- var err error
- if *r.curBknd, err = r.bkndClstr.nextBackend(*r.curBknd,BE_SEQ_RR); err == nil {
- r.setAffinity(selector, *r.curBknd)
- //r.affinity[selector] = *r.curBknd
- //log.Debugf("New affinity set to backend %s",(*r.curBknd).name)
- return *r.curBknd
- } else {
- sl.err = err
- return nil
- }
- }
- }
- default:
- log.Errorf("Internal: invalid data type in Route call %v", sel);
- return nil
+ }
+ default:
+ log.Errorf("Internal: invalid data type in Route call %v", sel)
+ return nil
}
- log.Errorf("Bad lookup in affinity map %v",r.affinity);
+ log.Errorf("Bad lookup in affinity map %v", r.affinity)
return nil
}
-func (ar AffinityRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error) {
- return "","",nil
+func (ar AffinityRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
+ return "", "", nil
}
-func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*backendCluster,error) {
+func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*backendCluster, error) {
return ar.bkndClstr, nil
}
@@ -361,41 +360,41 @@
func (r AffinityRouter) ReplyHandler(sel interface{}) error {
switch sl := sel.(type) {
- case *sbFrame:
- sl.lck.Lock()
- defer sl.lck.Unlock()
- log.Debugf("Reply handler called for sbFrame with method %s", sl.method);
- // Determine if reply action is required.
- if fld, ok := r.nbBindingMthdMap[sl.method]; ok == true && len(sl.payload) > 0 {
- // Extract the field value from the frame and
- // and set affinity accordingly
- if selector,err := r.decodeProtoField(sl.payload, fld); err == nil {
- log.Debug("Settign affinity on reply")
- if r.setAffinity(selector, sl.be) != nil {
- log.Error("Setting affinity on reply failed")
- }
- return nil
- } else {
- err := errors.New(fmt.Sprintf("Failed to decode reply field %d for method %s", fld, sl.method))
- log.Error(err)
- return err
+ case *sbFrame:
+ sl.lck.Lock()
+ defer sl.lck.Unlock()
+ log.Debugf("Reply handler called for sbFrame with method %s", sl.method)
+ // Determine if reply action is required.
+ if fld, ok := r.nbBindingMthdMap[sl.method]; ok == true && len(sl.payload) > 0 {
+ // Extract the field value from the frame and
+ // and set affinity accordingly
+ if selector, err := r.decodeProtoField(sl.payload, fld); err == nil {
+ log.Debug("Settign affinity on reply")
+ if r.setAffinity(selector, sl.be) != nil {
+ log.Error("Setting affinity on reply failed")
}
+ return nil
+ } else {
+ err := errors.New(fmt.Sprintf("Failed to decode reply field %d for method %s", fld, sl.method))
+ log.Error(err)
+ return err
}
- return nil
- default:
- err := errors.New(fmt.Sprintf("Internal: invalid data type in ReplyHander call %v", sl))
- log.Error(err)
- return err
+ }
+ return nil
+ default:
+ err := errors.New(fmt.Sprintf("Internal: invalid data type in ReplyHander call %v", sl))
+ log.Error(err)
+ return err
}
}
func (ar AffinityRouter) setAffinity(key string, be *backend) error {
- if be2,ok := ar.affinity[key]; ok == false {
+ if be2, ok := ar.affinity[key]; ok == false {
ar.affinity[key] = be
- log.Debugf("New affinity set to backend %s for key %s",be.name, key)
+ log.Debugf("New affinity set to backend %s for key %s", be.name, key)
} else if be2 != be {
err := errors.New(fmt.Sprintf("Attempting multiple sets of affinity for key %s to backend %s from %s on router %s",
- key, be.name, ar.affinity[key].name, ar.name))
+ key, be.name, ar.affinity[key].name, ar.name))
log.Error(err)
return err
}
diff --git a/afrouter/afrouter/api.go b/afrouter/afrouter/api.go
index aec1221..36e79a3 100644
--- a/afrouter/afrouter/api.go
+++ b/afrouter/afrouter/api.go
@@ -18,25 +18,24 @@
package afrouter
import (
- "net"
- "fmt"
"errors"
- "runtime"
- "strconv"
- "google.golang.org/grpc"
- "golang.org/x/net/context"
+ "fmt"
"github.com/opencord/voltha-go/common/log"
pb "github.com/opencord/voltha-protos/go/afrouter"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "net"
+ "runtime"
+ "strconv"
)
-
type ArouterApi struct {
- addr string
- port int
+ addr string
+ port int
apiListener net.Listener
- apiServer * grpc.Server
- running bool
- ar *ArouterProxy
+ apiServer *grpc.Server
+ running bool
+ ar *ArouterProxy
}
func newApi(config *ApiConfig, ar *ArouterProxy) (*ArouterApi, error) {
@@ -51,23 +50,23 @@
return nil, errors.New("Errors in API configuration")
} else {
var err error = nil
- aa := &ArouterApi{addr:config.Addr,port:int(config.Port),ar:ar}
+ aa := &ArouterApi{addr: config.Addr, port: int(config.Port), ar: ar}
// Create the listener for the API server
if aa.apiListener, err =
- net.Listen("tcp", config.Addr + ":"+
- strconv.Itoa(int(config.Port))); err != nil {
+ net.Listen("tcp", config.Addr+":"+
+ strconv.Itoa(int(config.Port))); err != nil {
log.Error(err)
return nil, err
}
// Create the API server
aa.apiServer = grpc.NewServer()
pb.RegisterConfigurationServer(aa.apiServer, *aa)
- return aa,err
+ return aa, err
}
}
func (aa *ArouterApi) getServer(srvr string) (*server, error) {
- if s,ok := aa.ar.servers[srvr]; ok == false {
+ if s, ok := aa.ar.servers[srvr]; ok == false {
err := errors.New(fmt.Sprintf("Server '%s' doesn't exist", srvr))
return nil, err
} else {
@@ -76,8 +75,8 @@
}
func (aa *ArouterApi) getRouter(s *server, clstr string) (Router, error) {
- for _,pkg := range s.routers {
- for _,r := range pkg {
+ for _, pkg := range s.routers {
+ for _, r := range pkg {
if c := r.FindBackendCluster(clstr); c != nil {
return r, nil
}
@@ -88,8 +87,8 @@
}
func (aa *ArouterApi) getCluster(s *server, clstr string) (*backendCluster, error) {
- for _,pkg := range s.routers {
- for _,r := range pkg {
+ for _, pkg := range s.routers {
+ for _, r := range pkg {
if c := r.FindBackendCluster(clstr); c != nil {
return c, nil
}
@@ -100,18 +99,18 @@
}
func (aa *ArouterApi) getBackend(c *backendCluster, bknd string) (*backend, error) {
- for _,b := range c.backends {
+ for _, b := range c.backends {
if b.name == bknd {
- return b,nil
+ return b, nil
}
}
err := errors.New(fmt.Sprintf("Backend '%s' doesn't exist in cluster %s",
- bknd, c.name))
+ bknd, c.name))
return nil, err
}
func (aa *ArouterApi) getConnection(b *backend, con string) (*beConnection, error) {
- if c,ok := b.connections[con]; ok == false {
+ if c, ok := b.connections[con]; ok == false {
err := errors.New(fmt.Sprintf("Connection '%s' doesn't exist", con))
return nil, err
} else {
@@ -119,8 +118,8 @@
}
}
-func (aa * ArouterApi) updateConnection(in *pb.Conn, cn *beConnection, b *backend) error {
- sPort := strconv.FormatUint(in.Port,10)
+func (aa *ArouterApi) updateConnection(in *pb.Conn, cn *beConnection, b *backend) error {
+ sPort := strconv.FormatUint(in.Port, 10)
// Check that the ip address and or port are different
if in.Addr == cn.addr && sPort == cn.port {
err := errors.New(fmt.Sprintf("Refusing to change connection '%s' to identical values", in.Connection))
@@ -135,7 +134,7 @@
}
func (aa ArouterApi) SetAffinity(ctx context.Context, in *pb.Affinity) (*pb.Result, error) {
- log.Debugf("SetAffinity called! %v",in);
+ log.Debugf("SetAffinity called! %v", in)
//return &pb.Result{Success:true,Error:""},nil
// Navigate down tot he connection and compare IP addresses and ports if they're
// not the same then close the existing connection. If they are bothe the same
@@ -144,33 +143,33 @@
aap := &aa
- _=aap
+ _ = aap
log.Debugf("Getting router %s and route %s", in.Router, in.Route)
- if r,ok := allRouters[in.Router+in.Route]; ok == true {
+ if r, ok := allRouters[in.Router+in.Route]; ok == true {
switch rr := r.(type) {
- case AffinityRouter:
- log.Debug("Affinity router found")
- b := rr.FindBackendCluster(in.Cluster).getBackend(in.Backend)
- if b != nil {
- rr.setAffinity(in.Id, b)
- } else {
- log.Errorf("Requested backend '%s' not found", in.Backend)
- }
- _ = rr
- case MethodRouter:
- log.Debug("Method router found")
- _ = rr
- default:
- log.Debug("Some other router found")
- _ = rr
+ case AffinityRouter:
+ log.Debug("Affinity router found")
+ b := rr.FindBackendCluster(in.Cluster).getBackend(in.Backend)
+ if b != nil {
+ rr.setAffinity(in.Id, b)
+ } else {
+ log.Errorf("Requested backend '%s' not found", in.Backend)
+ }
+ _ = rr
+ case MethodRouter:
+ log.Debug("Method router found")
+ _ = rr
+ default:
+ log.Debug("Some other router found")
+ _ = rr
}
} else {
log.Debugf("Couldn't get router type")
- return &pb.Result{Success:false,Error:err.Error()}, err
+ return &pb.Result{Success: false, Error: err.Error()}, err
}
- return &pb.Result{Success:true,Error:""},nil
+ return &pb.Result{Success: true, Error: ""}, nil
}
func (aa ArouterApi) SetConnection(ctx context.Context, in *pb.Conn) (*pb.Result, error) {
@@ -179,47 +178,47 @@
// then return an error describing the situation.
var s *server
var c *backendCluster
- var b * backend
- var cn * beConnection
+ var b *backend
+ var cn *beConnection
var err error
- log.Debugf("SetConnection called! %v",in);
+ log.Debugf("SetConnection called! %v", in)
aap := &aa
- if s,err = (aap).getServer(in.Server); err != nil {
+ if s, err = (aap).getServer(in.Server); err != nil {
err := errors.New(fmt.Sprintf("Server '%s' doesn't exist", in.Server))
log.Error(err)
- return &pb.Result{Success:false,Error:err.Error()}, err
+ return &pb.Result{Success: false, Error: err.Error()}, err
}
// The cluster is usually accessed via tha router but since each
// cluster is unique it's good enough to find the router that
// has the cluster we're looking for rather than fully keying
// the path
- if c,err = aap.getCluster(s, in.Cluster); err != nil {
+ if c, err = aap.getCluster(s, in.Cluster); err != nil {
log.Error(err)
- return &pb.Result{Success:false,Error:err.Error()}, err
+ return &pb.Result{Success: false, Error: err.Error()}, err
}
- if b,err = aap.getBackend(c, in.Backend); err != nil {
+ if b, err = aap.getBackend(c, in.Backend); err != nil {
log.Error(err)
- return &pb.Result{Success:false,Error:err.Error()}, err
+ return &pb.Result{Success: false, Error: err.Error()}, err
}
- if cn,err = aap.getConnection(b, in.Connection); err != nil {
+ if cn, err = aap.getConnection(b, in.Connection); err != nil {
log.Error(err)
- return &pb.Result{Success:false,Error:err.Error()}, err
+ return &pb.Result{Success: false, Error: err.Error()}, err
}
if err = aap.updateConnection(in, cn, b); err != nil {
log.Error(err)
- return &pb.Result{Success:false,Error:err.Error()}, err
+ return &pb.Result{Success: false, Error: err.Error()}, err
}
- return &pb.Result{Success:true,Error:""},nil
+ return &pb.Result{Success: true, Error: ""}, nil
}
func (aa ArouterApi) GetGoroutineCount(ctx context.Context, in *pb.Empty) (*pb.Count, error) {
- return &pb.Count{Count:uint32(runtime.NumGoroutine())}, nil
+ return &pb.Count{Count: uint32(runtime.NumGoroutine())}, nil
}
func (aa *ArouterApi) serve() {
@@ -233,4 +232,3 @@
}
}()
}
-
diff --git a/afrouter/afrouter/arproxy.go b/afrouter/afrouter/arproxy.go
index d809fbb..71c7b1f 100644
--- a/afrouter/afrouter/arproxy.go
+++ b/afrouter/afrouter/arproxy.go
@@ -25,41 +25,39 @@
"github.com/opencord/voltha-go/common/log"
)
-
type nbi int
const (
- GRPC_NBI nbi = 1
+ GRPC_NBI nbi = 1
GRPC_STREAMING_NBI nbi = 2
- GRPC_CONTROL_NBI nbi = 3
+ GRPC_CONTROL_NBI nbi = 3
)
// String names for display in error messages.
var arpxyNames = [...]string{"grpc_nbi", "grpc_streaming_nbi", "grpc_control_nbi"}
-var arProxy *ArouterProxy= nil
+var arProxy *ArouterProxy = nil
type ArouterProxy struct {
servers map[string]*server // Defined in handler.go
- api *ArouterApi
+ api *ArouterApi
}
-
// Create the routing proxy
func NewArouterProxy(conf *Configuration) (*ArouterProxy, error) {
- arProxy = &ArouterProxy{servers:make(map[string]*server)}
+ arProxy = &ArouterProxy{servers: make(map[string]*server)}
// Create all the servers listed in the configuration
- for _,s := range conf.Servers {
- if ns, err := newServer(&s); err != nil {
- log.Error("Configuration failed")
- return nil, err
- } else {
+ for _, s := range conf.Servers {
+ if ns, err := newServer(&s); err != nil {
+ log.Error("Configuration failed")
+ return nil, err
+ } else {
arProxy.servers[ns.Name()] = ns
}
}
// TODO: The API is not mandatory, check if it's even in the config before
// trying to create it. If it isn't then don't bother but log a warning.
- if api,err := newApi(&conf.Api, arProxy); err != nil {
+ if api, err := newApi(&conf.Api, arProxy); err != nil {
return nil, err
} else {
arProxy.api = api
diff --git a/afrouter/afrouter/backend.go b/afrouter/afrouter/backend.go
index 3f17af1..863652f 100644
--- a/afrouter/afrouter/backend.go
+++ b/afrouter/afrouter/backend.go
@@ -20,102 +20,99 @@
// Backend manager handles redundant connections per backend
import (
- "io"
- "fmt"
- "net"
- "sync"
- "time"
- "sort"
"errors"
- "strconv"
- "strings"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/metadata"
"google.golang.org/grpc/connectivity"
- "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc/metadata"
+ "io"
+ "net"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
)
-
-
const (
BE_ACTIVE_ACTIVE = 1 // Backend type active/active
- BE_SERVER = 2 // Backend type single server
- BE_SEQ_RR = 0 // Backend sequence round robin
- AS_NONE = 0 // Association strategy: none
- AS_SERIAL_NO = 1 // Association strategy: serial number
- AL_NONE = 0 // Association location: none
- AL_HEADER = 1 // Association location: header
- AL_PROTOBUF = 2 // Association location: protobuf
+ BE_SERVER = 2 // Backend type single server
+ BE_SEQ_RR = 0 // Backend sequence round robin
+ AS_NONE = 0 // Association strategy: none
+ AS_SERIAL_NO = 1 // Association strategy: serial number
+ AL_NONE = 0 // Association location: none
+ AL_HEADER = 1 // Association location: header
+ AL_PROTOBUF = 2 // Association location: protobuf
)
-
-var beTypeNames = []string{"","active_active","server"}
-var asTypeNames = []string{"","serial_number"}
-var alTypeNames = []string{"","header","protobuf"}
+var beTypeNames = []string{"", "active_active", "server"}
+var asTypeNames = []string{"", "serial_number"}
+var alTypeNames = []string{"", "header", "protobuf"}
var bClusters map[string]*backendCluster = make(map[string]*backendCluster)
type backendCluster struct {
name string
//backends map[string]*backend
- backends []*backend
- beRvMap map[*backend]int
+ backends []*backend
+ beRvMap map[*backend]int
serialNoSource chan uint64
}
type backend struct {
- lck sync.Mutex
- name string
- beType int
- activeAssoc assoc
- connFailCallback func(string, *backend)bool
- connections map[string]*beConnection
- srtdConns []*beConnection
- opnConns int
+ lck sync.Mutex
+ name string
+ beType int
+ activeAssoc assoc
+ connFailCallback func(string, *backend) bool
+ connections map[string]*beConnection
+ srtdConns []*beConnection
+ opnConns int
}
type assoc struct {
strategy int
location int
- field string // Used only if location is protobuf
- key string
+ field string // Used only if location is protobuf
+ key string
}
type beConnection struct {
- lck sync.Mutex
- cncl context.CancelFunc
- name string
- addr string
- port string
+ lck sync.Mutex
+ cncl context.CancelFunc
+ name string
+ addr string
+ port string
gConn *gConnection
- bknd *backend
+ bknd *backend
}
// This structure should never be referred to
// by any routine outside of *beConnection
// routines.
type gConnection struct {
- lck sync.Mutex
+ lck sync.Mutex
state connectivity.State
- conn *grpc.ClientConn
- cncl context.CancelFunc
+ conn *grpc.ClientConn
+ cncl context.CancelFunc
}
type beClStrm struct {
- strm grpc.ClientStream
- ctxt context.Context
- cncl context.CancelFunc
+ strm grpc.ClientStream
+ ctxt context.Context
+ cncl context.CancelFunc
ok2Close chan struct{}
- c2sRtrn chan error
- s2cRtrn error
+ c2sRtrn chan error
+ s2cRtrn error
}
type beClStrms struct {
- lck sync.Mutex
- actvStrm *beClStrm
- strms map[string]*beClStrm
+ lck sync.Mutex
+ actvStrm *beClStrm
+ strms map[string]*beClStrm
srtdStrms []*beClStrm
}
@@ -137,7 +134,7 @@
rtrn_err = true
}
//bc := &backendCluster{name:conf.Name,backends:make(map[string]*backend)}
- bc := &backendCluster{name:conf.Name, beRvMap:make(map[*backend]int)}
+ bc := &backendCluster{name: conf.Name, beRvMap: make(map[*backend]int)}
bClusters[bc.name] = bc
bc.startSerialNumberSource() // Serial numberere for active/active backends
idx := 0
@@ -146,7 +143,7 @@
log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
rtrn_err = true
}
- if be,err = newBackend(&bec, conf.Name); err != nil {
+ if be, err = newBackend(&bec, conf.Name); err != nil {
log.Errorf("Error creating backend %s", bec.Name)
rtrn_err = true
}
@@ -160,8 +157,8 @@
return bc, nil
}
-func (bc * backendCluster) getBackend(name string) *backend {
- for _,v := range bc.backends {
+func (bc *backendCluster) getBackend(name string) *backend {
+ for _, v := range bc.backends {
if v.name == name {
return v
}
@@ -181,48 +178,48 @@
}()
}
-func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend,error) {
+func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend, error) {
switch seq {
- case BE_SEQ_RR: // Round robin
- in := be
- // If no backend is found having a connection
- // then return nil.
- if be == nil {
- log.Debug("Previous backend is nil")
- be = bc.backends[0]
- in = be
- if be.opnConns != 0 {
- return be,nil
- }
+ case BE_SEQ_RR: // Round robin
+ in := be
+ // If no backend is found having a connection
+ // then return nil.
+ if be == nil {
+ log.Debug("Previous backend is nil")
+ be = bc.backends[0]
+ in = be
+ if be.opnConns != 0 {
+ return be, nil
}
- for {
- log.Debugf("Requesting a new backend starting from %s", be.name)
- cur := bc.beRvMap[be]
- cur++
- if cur >= len(bc.backends) {
- cur = 0
- }
- log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name)
- if bc.backends[cur].opnConns > 0 {
- return bc.backends[cur], nil
- }
- if bc.backends[cur] == in {
- err := fmt.Errorf("No backend with open connections found")
- log.Debug(err);
- return nil,err
- }
- be = bc.backends[cur]
- log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name)
+ }
+ for {
+ log.Debugf("Requesting a new backend starting from %s", be.name)
+ cur := bc.beRvMap[be]
+ cur++
+ if cur >= len(bc.backends) {
+ cur = 0
}
- default: // Invalid, defalt to routnd robin
- log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
- return bc.nextBackend(be, BE_SEQ_RR)
+ log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name)
+ if bc.backends[cur].opnConns > 0 {
+ return bc.backends[cur], nil
+ }
+ if bc.backends[cur] == in {
+ err := fmt.Errorf("No backend with open connections found")
+ log.Debug(err)
+ return nil, err
+ }
+ be = bc.backends[cur]
+ log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name)
+ }
+ default: // Invalid, defalt to routnd robin
+ log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
+ return bc.nextBackend(be, BE_SEQ_RR)
}
}
func (bec *backendCluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, mthdSlice []string,
- mk string, mv string) error {
-//func (bec *backendCluster) handler(nbR * nbRequest) error {
+ mk string, mv string) error {
+ //func (bec *backendCluster) handler(nbR * nbRequest) error {
// The final backend cluster needs to be determined here. With non-affinity routed backends it could
// just be determined here and for affinity routed backends the first message must be received
@@ -231,25 +228,25 @@
// Get the backend to use.
// Allocate the nbFrame here since it holds the "context" of this communication
- nf := &nbFrame{router:r, mthdSlice:mthdSlice, serNo:bec.serialNoSource, metaKey:mk, metaVal:mv}
+ nf := &nbFrame{router: r, mthdSlice: mthdSlice, serNo: bec.serialNoSource, metaKey: mk, metaVal: mv}
log.Debugf("Nb frame allocate with method %s", nf.mthdSlice[REQ_METHOD])
- if be,err := bec.assignBackend(serverStream, nf); err != nil {
+ if be, err := bec.assignBackend(serverStream, nf); err != nil {
// At this point, no backend streams have been initiated
// so just return the error.
return err
} else {
log.Debugf("Backend '%s' selected", be.name)
// Allocate a sbFrame here because it might be needed for return value intercept
- sf := &sbFrame{router:r, be:be, method:nf.mthdSlice[REQ_METHOD], metaKey:mk, metaVal:mv}
- log.Debugf("Sb frame allocated with router %s",r.Name())
+ sf := &sbFrame{router: r, be: be, method: nf.mthdSlice[REQ_METHOD], metaKey: mk, metaVal: mv}
+ log.Debugf("Sb frame allocated with router %s", r.Name())
return be.handler(srv, serverStream, nf, sf)
}
}
-func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f * nbFrame) (*beClStrms, error) {
+func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f *nbFrame) (*beClStrms, error) {
- rtrn := &beClStrms{strms:make(map[string]*beClStrm),actvStrm:nil}
+ rtrn := &beClStrms{strms: make(map[string]*beClStrm), actvStrm: nil}
log.Debugf("Opening southbound streams for method '%s'", f.mthdSlice[REQ_METHOD])
// Get the metadata from the incoming message on the server
@@ -268,7 +265,7 @@
var atLeastOne bool = false
var errStr strings.Builder
log.Debugf("There are %d connections to open", len(be.connections))
- for _,cn := range be.srtdConns {
+ for _, cn := range be.srtdConns {
// TODO: THIS IS A HACK to suspend redundancy for binding routers for all calls
// and its very specific to a use case. There should really be a per method
// mechanism to select non-redundant calls for all router types. This needs
@@ -281,7 +278,7 @@
continue
}
// Copy in the metadata
- if cn.getState() == connectivity.Ready && cn.getConn() != nil {
+ if cn.getState() == connectivity.Ready && cn.getConn() != nil {
log.Debugf("Opening southbound stream for connection '%s'", cn.name)
// Create an outgoing context that includes the incoming metadata
// and that will cancel if the server's context is canceled
@@ -289,18 +286,18 @@
clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
//TODO: Same check here, only add the serial number if necessary
clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
- strconv.FormatUint(serialNo,10))
+ strconv.FormatUint(serialNo, 10))
// Create the client stream
if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
- cn.getConn(), f.mthdSlice[REQ_ALL]); err !=nil {
- log.Debugf("Failed to create a client stream '%s', %v",cn.name,err)
+ cn.getConn(), f.mthdSlice[REQ_ALL]); err != nil {
+ log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
rtrn.strms[cn.name] = nil
} else {
- rtrn.strms[cn.name] = &beClStrm{strm:clientStream, ctxt:clientCtx,
- cncl:clientCancel, s2cRtrn:nil,
- ok2Close:make(chan struct{}),
- c2sRtrn:make(chan error, 1)}
+ rtrn.strms[cn.name] = &beClStrm{strm: clientStream, ctxt: clientCtx,
+ cncl: clientCancel, s2cRtrn: nil,
+ ok2Close: make(chan struct{}),
+ c2sRtrn: make(chan error, 1)}
atLeastOne = true
}
} else if cn.getConn() == nil {
@@ -315,24 +312,24 @@
}
if atLeastOne == true {
rtrn.sortStreams()
- return rtrn,nil
+ return rtrn, nil
}
- fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ",be.name)
+ fmt.Fprintf(&errStr, "{{No streams available for backend '%s' unable to send}} ", be.name)
log.Error(errStr.String())
return nil, errors.New(errStr.String())
}
-func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf * nbFrame, sf * sbFrame) error {
+func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *nbFrame, sf *sbFrame) error {
- // Set up and launch each individual southbound stream
+ // Set up and launch each individual southbound stream
var beStrms *beClStrms
var rtrn error = nil
var s2cOk bool = false
var c2sOk bool = false
- beStrms, err := be.openSouthboundStreams(srv,serverStream,nf)
+ beStrms, err := be.openSouthboundStreams(srv, serverStream, nf)
if err != nil {
- log.Errorf("openStreams failed: %v",err)
+ log.Errorf("openStreams failed: %v", err)
return err
}
// If we get here, there has to be AT LEAST ONE open stream
@@ -362,7 +359,7 @@
return rtrn
}
} else {
- log.Debugf("s2cErr reporting %v",s2cErr)
+ log.Debugf("s2cErr reporting %v", s2cErr)
// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
// exit with an error to the stack
@@ -382,7 +379,7 @@
// the southbound streams are closed. Should this happen one of the
// backends may not get the request.
if c2sErr != io.EOF {
- rtrn = c2sErr
+ rtrn = c2sErr
}
log.Debug("c2sErr reporting EOF")
if s2cOk == true {
@@ -394,7 +391,7 @@
}
func (strms *beClStrms) clientCancel() {
- for _,strm := range strms.strms {
+ for _, strm := range strms.strms {
if strm != nil {
strm.cncl()
}
@@ -402,7 +399,7 @@
}
func (strms *beClStrms) closeSend() {
- for _,strm := range strms.strms {
+ for _, strm := range strms.strms {
if strm != nil {
<-strm.ok2Close
log.Debug("Closing southbound stream")
@@ -436,13 +433,13 @@
return f.be, nil
}
-func (strms * beClStrms) getActive() *beClStrm {
+func (strms *beClStrms) getActive() *beClStrm {
strms.lck.Lock()
defer strms.lck.Unlock()
return strms.actvStrm
}
-func (strms *beClStrms) setThenGetActive(strm *beClStrm) (*beClStrm) {
+func (strms *beClStrms) setThenGetActive(strm *beClStrm) *beClStrm {
strms.lck.Lock()
defer strms.lck.Unlock()
if strms.actvStrm == nil {
@@ -505,17 +502,17 @@
ret := make(chan error, 1)
agg := make(chan *beClStrm)
atLeastOne := false
- for _,strm := range src.strms {
+ for _, strm := range src.strms {
if strm != nil {
go fc2s(strm)
go func(s *beClStrm) { // Wait on result and aggregate
r := <-s.c2sRtrn // got the return code
if r == nil {
- return // We're the redundat stream, just die
+ return // We're the redundat stream, just die
}
s.c2sRtrn <- r // put it back to pass it along
- agg <- s // send the stream to the aggregator
- } (strm)
+ agg <- s // send the stream to the aggregator
+ }(strm)
atLeastOne = true
}
}
@@ -536,7 +533,7 @@
var rtrn error
atLeastOne := false
- for _,strm := range strms.srtdStrms {
+ for _, strm := range strms.srtdStrms {
if strm != nil {
if err := strm.strm.SendMsg(f); err != nil {
log.Debugf("Error on SendMsg: %s", err.Error())
@@ -550,7 +547,7 @@
// If one of the streams succeeded, declare success
// if none did pick an error and return it.
if atLeastOne == true {
- for _,strm := range strms.srtdStrms {
+ for _, strm := range strms.srtdStrms {
if strm != nil {
rtrn = strm.s2cRtrn
if rtrn == nil {
@@ -563,14 +560,14 @@
rtrn = errors.New("There are no open streams, this should never happen")
log.Error(rtrn)
}
- return rtrn;
+ return rtrn
}
func (dst *beClStrms) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
ret := make(chan error, 1)
go func() {
// The frame buffer already has the results of a first
- // RecvMsg in it so the first thing to do is to
+ // RecvMsg in it so the first thing to do is to
// send it to the list of client streams and only
// then read some more.
for i := 0; ; i++ {
@@ -590,24 +587,24 @@
return ret
}
-func (st * beClStrms) sortStreams() {
+func (st *beClStrms) sortStreams() {
var tmpKeys []string
- for k,_ := range st.strms {
+ for k, _ := range st.strms {
tmpKeys = append(tmpKeys, k)
}
sort.Strings(tmpKeys)
- for _,v := range tmpKeys {
+ for _, v := range tmpKeys {
st.srtdStrms = append(st.srtdStrms, st.strms[v])
}
}
-func (be * backend) sortConns() {
+func (be *backend) sortConns() {
var tmpKeys []string
- for k,_ := range be.connections {
+ for k, _ := range be.connections {
tmpKeys = append(tmpKeys, k)
}
sort.Strings(tmpKeys)
- for _,v := range tmpKeys {
+ for _, v := range tmpKeys {
be.srtdConns = append(be.srtdConns, be.connections[v])
}
}
@@ -617,8 +614,8 @@
log.Debugf("Configuring the backend with %v", *conf)
// Validate the conifg and configure the backend
- be:=&backend{name:conf.Name,connections:make(map[string]*beConnection),opnConns:0}
- idx := strIndex([]string(beTypeNames),conf.Type)
+ be := &backend{name: conf.Name, connections: make(map[string]*beConnection), opnConns: 0}
+ idx := strIndex([]string(beTypeNames), conf.Type)
if idx == 0 {
log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
@@ -628,7 +625,7 @@
idx = strIndex(asTypeNames, conf.Association.Strategy)
if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
log.Errorf("An association strategy must be provided if the backend "+
- "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
+ "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
}
be.activeAssoc.strategy = idx
@@ -636,23 +633,23 @@
idx = strIndex(alTypeNames, conf.Association.Location)
if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
log.Errorf("An association location must be provided if the backend "+
- "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
+ "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
}
be.activeAssoc.location = idx
if conf.Association.Field == "" && be.activeAssoc.location == AL_PROTOBUF {
log.Errorf("An association field must be provided if the backend "+
- "type is active/active and the location is set to protobuf "+
- "for backend %s in cluster %s", conf.Name, clusterName)
+ "type is active/active and the location is set to protobuf "+
+ "for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
}
be.activeAssoc.field = conf.Association.Field
if conf.Association.Key == "" && be.activeAssoc.location == AL_HEADER {
log.Errorf("An association key must be provided if the backend "+
- "type is active/active and the location is set to header "+
- "for backend %s in cluster %s", conf.Name, clusterName)
+ "type is active/active and the location is set to header "+
+ "for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
}
be.activeAssoc.key = conf.Association.Key
@@ -664,34 +661,34 @@
// at a later time.
// TODO: validate that there is one connection for all but active/active backends
if len(conf.Connections) > 1 && be.activeAssoc.strategy != BE_ACTIVE_ACTIVE {
- log.Errorf("Only one connection must be specified if the association "+
- "strategy is not set to 'active_active'")
+ log.Errorf("Only one connection must be specified if the association " +
+ "strategy is not set to 'active_active'")
rtrn_err = true
}
if len(conf.Connections) == 0 {
log.Errorf("At least one connection must be specified")
rtrn_err = true
}
- for _,cnConf := range conf.Connections {
+ for _, cnConf := range conf.Connections {
if cnConf.Name == "" {
log.Errorf("A connection must have a name for backend %s in cluster %s",
- conf.Name, clusterName)
+ conf.Name, clusterName)
} else {
- gc:=&gConnection{conn:nil,cncl:nil,state:connectivity.Idle}
- be.connections[cnConf.Name] = &beConnection{name:cnConf.Name,addr:cnConf.Addr,port:cnConf.Port,bknd:be,gConn:gc}
+ gc := &gConnection{conn: nil, cncl: nil, state: connectivity.Idle}
+ be.connections[cnConf.Name] = &beConnection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, bknd: be, gConn: gc}
if cnConf.Addr != "" { // This connection will be specified later.
if ip := net.ParseIP(cnConf.Addr); ip == nil {
log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid",
- cnConf.Name, conf.Name, clusterName)
+ cnConf.Name, conf.Name, clusterName)
rtrn_err = true
}
// Validate the port number. This just validtes that it's a non 0 integer
- if n,err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
+ if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
cnConf.Port, cnConf.Name, conf.Name, clusterName)
rtrn_err = true
} else {
- if n <=0 && n > 65535 {
+ if n <= 0 && n > 65535 {
log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
cnConf.Port, cnConf.Name, conf.Name, clusterName)
rtrn_err = true
@@ -738,20 +735,20 @@
// on a first attempt to connect. Individual connections should be
// handled after that.
func (be *backend) connectAll() {
- for _,cn := range be.connections {
+ for _, cn := range be.connections {
cn.connect()
}
}
func (cn *beConnection) connect() {
if cn.addr != "" && cn.getConn() == nil {
- log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name,cn.addr,cn.port)
+ log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
// Dial doesn't block, it just returns and continues connecting in the background.
// Check back later to confirm and increase the connection count.
ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
cn.setCncl(cnclFnc)
if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
- log.Errorf("Dialng connection %v:%v",cn,err)
+ log.Errorf("Dialng connection %v:%v", cn, err)
cn.waitAndTryAgain(ctx)
} else {
cn.setConn(conn)
@@ -767,19 +764,19 @@
func (cn *beConnection) waitAndTryAgain(ctx context.Context) {
go func(ctx context.Context) {
- ctxTm,cnclTm := context.WithTimeout(context.Background(), 10 * time.Second)
- select {
- case <-ctxTm.Done():
- cnclTm()
- log.Debugf("Trying to connect '%s'",cn.name)
- // Connect creates a new context so cancel this one.
- cn.cancel()
- cn.connect()
- return
- case <-ctx.Done():
- cnclTm()
- return
- }
+ ctxTm, cnclTm := context.WithTimeout(context.Background(), 10*time.Second)
+ select {
+ case <-ctxTm.Done():
+ cnclTm()
+ log.Debugf("Trying to connect '%s'", cn.name)
+ // Connect creates a new context so cancel this one.
+ cn.cancel()
+ cn.connect()
+ return
+ case <-ctx.Done():
+ cnclTm()
+ return
+ }
}(ctx)
}
@@ -787,7 +784,7 @@
cn.lck.Lock()
defer cn.lck.Unlock()
log.Debugf("Canceling connection %s", cn.name)
- if cn.gConn != nil{
+ if cn.gConn != nil {
if cn.gConn.cncl != nil {
cn.cncl()
} else {
@@ -844,7 +841,7 @@
// Now replace the gConn object with a new one as this one just
// fades away as references to it are released after the close
// finishes in the background.
- cn.gConn = &gConnection{conn:nil,cncl:nil,state:connectivity.TransientFailure}
+ cn.gConn = &gConnection{conn: nil, cncl: nil, state: connectivity.TransientFailure}
} else {
log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
}
@@ -861,7 +858,7 @@
}
}
-func (cn *beConnection) getState() (connectivity.State) {
+func (cn *beConnection) getState() connectivity.State {
cn.lck.Lock()
defer cn.lck.Unlock()
if cn.gConn != nil {
@@ -877,7 +874,6 @@
return connectivity.TransientFailure
}
-
func (cn *beConnection) monitor(ctx context.Context) {
bp := cn.bknd
log.Debugf("Setting up monitoring for backend %s", bp.name)
@@ -885,58 +881,58 @@
var delay time.Duration = 100 //ms
for {
//log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, bp.name, cn.conn)
- if cn.getState() == connectivity.Ready {
+ if cn.getState() == connectivity.Ready {
log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, bp.name)
cn.setState(connectivity.Ready)
bp.incConn()
if cn.getConn() != nil && cn.getConn().WaitForStateChange(ctx, connectivity.Ready) == false {
// The context was canceled. This is done by the close function
// so just exit the routine
- log.Debugf("Contxt canceled for connection '%s' on backend '%s'",cn.name, bp.name)
+ log.Debugf("Contxt canceled for connection '%s' on backend '%s'", cn.name, bp.name)
return
}
if cs := cn.getConn(); cs != nil {
switch cs := cn.getState(); cs {
- case connectivity.TransientFailure:
- cn.setState(cs)
- bp.decConn()
- log.Infof("Transient failure for connection '%s' on backend '%s'",cn.name, bp.name)
- delay = 100
- case connectivity.Shutdown:
- //The connection was closed. The assumption here is that the closer
- // will manage the connection count and setting the conn to nil.
- // Exit the routine
- log.Infof("Shutdown for connection '%s' on backend '%s'",cn.name, bp.name)
- return
- case connectivity.Idle:
- // This can only happen if the server sends a GoAway. This can
- // only happen if the server has modified MaxConnectionIdle from
- // its default of infinity. The only solution here is to close the
- // connection and keepTrying()?
- //TODO: Read the grpc source code to see if there's a different approach
- log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'",cn.name, bp.name)
- cn.close()
- cn.connect()
- return
+ case connectivity.TransientFailure:
+ cn.setState(cs)
+ bp.decConn()
+ log.Infof("Transient failure for connection '%s' on backend '%s'", cn.name, bp.name)
+ delay = 100
+ case connectivity.Shutdown:
+ //The connection was closed. The assumption here is that the closer
+ // will manage the connection count and setting the conn to nil.
+ // Exit the routine
+ log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, bp.name)
+ return
+ case connectivity.Idle:
+ // This can only happen if the server sends a GoAway. This can
+ // only happen if the server has modified MaxConnectionIdle from
+ // its default of infinity. The only solution here is to close the
+ // connection and keepTrying()?
+ //TODO: Read the grpc source code to see if there's a different approach
+ log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, bp.name)
+ cn.close()
+ cn.connect()
+ return
}
} else { // A nil means something went horribly wrong, error and exit.
- log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s",cn.name)
+ log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s", cn.name)
return
}
} else {
log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, bp.name)
- ctxTm, cnclTm := context.WithTimeout(context.Background(), delay * time.Millisecond)
+ ctxTm, cnclTm := context.WithTimeout(context.Background(), delay*time.Millisecond)
if delay < 30000 {
- delay += delay
+ delay += delay
}
select {
- case <-ctxTm.Done():
- cnclTm() // Doubt this is required but it's harmless.
- // Do nothing but let the loop continue
- case <-ctx.Done():
- // Context was closed, close and exit routine
- //cn.close() NO! let the close be managed externally!
- return
+ case <-ctxTm.Done():
+ cnclTm() // Doubt this is required but it's harmless.
+ // Do nothing but let the loop continue
+ case <-ctx.Done():
+ // Context was closed, close and exit routine
+ //cn.close() NO! let the close be managed externally!
+ return
}
}
}
@@ -945,7 +941,6 @@
// Set a callback for connection failure notification
// This is currently not used.
-func (bp * backend) setConnFailCallback(cb func(string, *backend)bool) {
+func (bp *backend) setConnFailCallback(cb func(string, *backend) bool) {
bp.connFailCallback = cb
}
-
diff --git a/afrouter/afrouter/binding-router.go b/afrouter/afrouter/binding-router.go
index 11e852d..7de3ea7 100644
--- a/afrouter/afrouter/binding-router.go
+++ b/afrouter/afrouter/binding-router.go
@@ -18,59 +18,59 @@
package afrouter
import (
- "fmt"
"errors"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
- "github.com/opencord/voltha-go/common/log"
)
type BindingRouter struct {
- name string
- routerType int // TODO: This is probably not needed
+ name string
+ routerType int // TODO: This is probably not needed
association int
//routingField string
grpcService string
//protoDescriptor *pb.FileDescriptorSet
//methodMap map[string]byte
- bkndClstr *backendCluster
- bindings map[string]*backend
- bindingType string
- bindingField string
+ bkndClstr *backendCluster
+ bindings map[string]*backend
+ bindingType string
+ bindingField string
bindingMethod string
- curBknd **backend
+ curBknd **backend
}
-func (br BindingRouter) BackendCluster(s string, metaKey string) (*backendCluster,error) {
+func (br BindingRouter) BackendCluster(s string, metaKey string) (*backendCluster, error) {
return br.bkndClstr, nil
//return nil,errors.New("Not implemented yet")
}
-func (br BindingRouter) Name() (string) {
+func (br BindingRouter) Name() string {
return br.name
}
-func (br BindingRouter) Service() (string) {
+func (br BindingRouter) Service() string {
return br.grpcService
}
-func (br BindingRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error) {
+func (br BindingRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
var rtrnK string = ""
var rtrnV string = ""
// Get the metadata from the server stream
- md, ok := metadata.FromIncomingContext(serverStream.Context())
+ md, ok := metadata.FromIncomingContext(serverStream.Context())
if !ok {
- return rtrnK, rtrnV, errors.New("Could not get a server stream metadata")
- }
+ return rtrnK, rtrnV, errors.New("Could not get a server stream metadata")
+ }
// Determine if one of the method routing keys exists in the metadata
- if _,ok := md[br.bindingField]; ok == true {
+ if _, ok := md[br.bindingField]; ok == true {
rtrnV = md[br.bindingField][0]
rtrnK = br.bindingField
}
- return rtrnK,rtrnV,nil
+ return rtrnK, rtrnV, nil
}
-func (br BindingRouter) FindBackendCluster(becName string) (*backendCluster) {
- if becName == br.bkndClstr.name {
+func (br BindingRouter) FindBackendCluster(becName string) *backendCluster {
+ if becName == br.bkndClstr.name {
return br.bkndClstr
}
return nil
@@ -78,56 +78,55 @@
func (br BindingRouter) ReplyHandler(v interface{}) error {
return nil
}
-func (br BindingRouter) Route(sel interface{}) (*backend) {
+func (br BindingRouter) Route(sel interface{}) *backend {
var err error
switch sl := sel.(type) {
- case *nbFrame:
- if b, ok := br.bindings[sl.metaVal]; ok == true { // binding exists, just return it
- return b
- } else { // establish a new binding or error.
- if sl.metaVal != "" {
- err = errors.New(fmt.Sprintf("Attempt to route on non-existent metadata value '%s' in key '%s'",
- sl.metaVal, sl.metaKey))
- log.Error(err)
- sl.err = err
- return nil
- }
- if sl.mthdSlice[REQ_METHOD] != br.bindingMethod {
- err = errors.New(fmt.Sprintf("Binding must occur with method %s but attempted with method %s",
- br.bindingMethod, sl.mthdSlice[REQ_METHOD]))
- log.Error(err)
- sl.err = err
- return nil
- }
- log.Debugf("MUST CREATE A NEW BINDING MAP ENTRY!!")
- if len(br.bindings) < len(br.bkndClstr.backends) {
- if *br.curBknd, err = br.bkndClstr.nextBackend(*br.curBknd,BE_SEQ_RR); err == nil {
- // Use the name of the backend as the metaVal for this new binding
- br.bindings[(*br.curBknd).name] = *br.curBknd
- return *br.curBknd
- } else {
- log.Error(err)
- sl.err = err
- return nil
- }
- } else {
- err = errors.New(fmt.Sprintf("Backends exhausted in attempt to bind for metakey '%s' with value '%s'",
- sl.metaKey, sl.metaVal))
- log.Error(err)
- sl.err = err
- }
+ case *nbFrame:
+ if b, ok := br.bindings[sl.metaVal]; ok == true { // binding exists, just return it
+ return b
+ } else { // establish a new binding or error.
+ if sl.metaVal != "" {
+ err = errors.New(fmt.Sprintf("Attempt to route on non-existent metadata value '%s' in key '%s'",
+ sl.metaVal, sl.metaKey))
+ log.Error(err)
+ sl.err = err
+ return nil
}
- return nil
- default:
- return nil
+ if sl.mthdSlice[REQ_METHOD] != br.bindingMethod {
+ err = errors.New(fmt.Sprintf("Binding must occur with method %s but attempted with method %s",
+ br.bindingMethod, sl.mthdSlice[REQ_METHOD]))
+ log.Error(err)
+ sl.err = err
+ return nil
+ }
+ log.Debugf("MUST CREATE A NEW BINDING MAP ENTRY!!")
+ if len(br.bindings) < len(br.bkndClstr.backends) {
+ if *br.curBknd, err = br.bkndClstr.nextBackend(*br.curBknd, BE_SEQ_RR); err == nil {
+ // Use the name of the backend as the metaVal for this new binding
+ br.bindings[(*br.curBknd).name] = *br.curBknd
+ return *br.curBknd
+ } else {
+ log.Error(err)
+ sl.err = err
+ return nil
+ }
+ } else {
+ err = errors.New(fmt.Sprintf("Backends exhausted in attempt to bind for metakey '%s' with value '%s'",
+ sl.metaKey, sl.metaVal))
+ log.Error(err)
+ sl.err = err
+ }
+ }
+ return nil
+ default:
+ return nil
}
- return nil
}
func newBindingRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
var rtrn_err bool = false
var err error = nil
- log.Debugf("Creating binding router %s",config.Name)
+ log.Debugf("Creating binding router %s", config.Name)
// A name must exist
if config.Name == "" {
log.Error("A router 'name' must be specified")
@@ -162,11 +161,11 @@
var bptr *backend
bptr = nil
br := BindingRouter{
- name:config.Name,
- grpcService:rconf.ProtoService,
- bindings:make(map[string]*backend),
+ name: config.Name,
+ grpcService: rconf.ProtoService,
+ bindings: make(map[string]*backend),
//methodMap:make(map[string]byte),
- curBknd:&bptr,
+ curBknd: &bptr,
//serialNo:0,
}
@@ -174,9 +173,9 @@
br.association = strIndex(rAssnNames, config.Binding.Association)
if br.association == 0 {
if config.Binding.Association == "" {
- log.Error("An binding association must be specified")
+ log.Error("An binding association must be specified")
} else {
- log.Errorf("The binding association '%s' is not valid", config.Binding.Association)
+ log.Errorf("The binding association '%s' is not valid", config.Binding.Association)
}
rtrn_err = true
}
@@ -203,7 +202,6 @@
br.bindingField = config.Binding.Field
}
-
// This has already been validated bfore this function
// is called so just use it.
for idx := range rTypeNames {
@@ -213,7 +211,7 @@
}
}
- // Create the backend cluster or link to an existing one
+ // Create the backend cluster or link to an existing one
ok := true
if br.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
if br.bkndClstr, err = newBackendCluster(config.backendCluster); err != nil {
@@ -225,9 +223,8 @@
// HERE HERE HERE
if rtrn_err {
- return br,errors.New(fmt.Sprintf("Failed to create a new router '%s'",br.name))
+ return br, errors.New(fmt.Sprintf("Failed to create a new router '%s'", br.name))
}
-
- return br,nil
+ return br, nil
}
diff --git a/afrouter/afrouter/codec.go b/afrouter/afrouter/codec.go
index 6090bdc..7147916 100644
--- a/afrouter/afrouter/codec.go
+++ b/afrouter/afrouter/codec.go
@@ -19,10 +19,10 @@
import (
"fmt"
- "sync"
- "google.golang.org/grpc"
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "sync"
)
func Codec() grpc.Codec {
@@ -39,53 +39,53 @@
type sbFrame struct {
payload []byte
- router Router
- method string
- be *backend
- lck sync.Mutex
+ router Router
+ method string
+ be *backend
+ lck sync.Mutex
metaKey string
metaVal string
}
type nbFrame struct {
- payload []byte
- router Router
- be *backend
- err error
+ payload []byte
+ router Router
+ be *backend
+ err error
mthdSlice []string
- serNo chan uint64
- metaKey string
- metaVal string
+ serNo chan uint64
+ metaKey string
+ metaVal string
}
func (cdc *transparentRoutingCodec) Marshal(v interface{}) ([]byte, error) {
switch t := v.(type) {
- case *sbFrame:
- return t.payload, nil
- case *nbFrame:
- return t.payload, nil
- default:
- return cdc.parentCodec.Marshal(v)
+ case *sbFrame:
+ return t.payload, nil
+ case *nbFrame:
+ return t.payload, nil
+ default:
+ return cdc.parentCodec.Marshal(v)
}
}
func (cdc *transparentRoutingCodec) Unmarshal(data []byte, v interface{}) error {
switch t := v.(type) {
- case *sbFrame:
- t.payload = data
- // This is where the affinity is established on a northbound response
- t.router.ReplyHandler(v)
- return nil
- case *nbFrame:
- t.payload = data
- // This is were the afinity value is pulled from the payload
- // and the backend selected.
- t.be = t.router.Route(v)
- log.Debugf("Routing returned %v for method %s", t.be, t.mthdSlice[REQ_METHOD])
- return nil
- default:
- return cdc.parentCodec.Unmarshal(data,v)
+ case *sbFrame:
+ t.payload = data
+ // This is where the affinity is established on a northbound response
+ t.router.ReplyHandler(v)
+ return nil
+ case *nbFrame:
+ t.payload = data
+ // This is were the afinity value is pulled from the payload
+ // and the backend selected.
+ t.be = t.router.Route(v)
+ log.Debugf("Routing returned %v for method %s", t.be, t.mthdSlice[REQ_METHOD])
+ return nil
+ default:
+ return cdc.parentCodec.Unmarshal(data, v)
}
}
diff --git a/afrouter/afrouter/config.go b/afrouter/afrouter/config.go
index a9a01eb..2cc2976 100644
--- a/afrouter/afrouter/config.go
+++ b/afrouter/afrouter/config.go
@@ -19,27 +19,27 @@
// Command line parameters and parsing
import (
- "os"
- "fmt"
- "flag"
- "path"
- "errors"
- "io/ioutil"
"encoding/json"
+ "errors"
+ "flag"
+ "fmt"
"github.com/opencord/voltha-go/common/log"
+ "io/ioutil"
+ "os"
+ "path"
)
func ParseCmd() (*Configuration, error) {
config := &Configuration{}
- cmdParse := flag.NewFlagSet(path.Base(os.Args[0]), flag.ContinueOnError);
+ cmdParse := flag.NewFlagSet(path.Base(os.Args[0]), flag.ContinueOnError)
config.ConfigFile = cmdParse.String("config", "arouter.json", "The configuration file for the affinity router")
config.LogLevel = cmdParse.Int("logLevel", 0, "The log level for the affinity router")
config.GrpcLog = cmdParse.Bool("grpclog", false, "Enable GRPC logging")
- err := cmdParse.Parse(os.Args[1:]);
+ err := cmdParse.Parse(os.Args[1:])
if err != nil {
//return err
- return nil, errors.New("Error parsing the command line");
+ return nil, errors.New("Error parsing the command line")
}
//if(!cmdParse.Parsed()) {
//}
@@ -48,72 +48,71 @@
// Configuration file loading and parsing
type Configuration struct {
- ConfigFile * string
- LogLevel * int
- GrpcLog * bool
- Servers []ServerConfig `json:"servers"`
- Ports PortConfig `json:"ports"`
- ServerCertificates ServerCertConfig `json:"serverCertificates"`
- ClientCertificates ClientCertConfig `json:"clientCertificates"`
- BackendClusters []BackendClusterConfig `json:"backend_clusters"`
- Routers []RouterConfig `json:"routers"`
- Api ApiConfig
+ ConfigFile *string
+ LogLevel *int
+ GrpcLog *bool
+ Servers []ServerConfig `json:"servers"`
+ Ports PortConfig `json:"ports"`
+ ServerCertificates ServerCertConfig `json:"serverCertificates"`
+ ClientCertificates ClientCertConfig `json:"clientCertificates"`
+ BackendClusters []BackendClusterConfig `json:"backend_clusters"`
+ Routers []RouterConfig `json:"routers"`
+ Api ApiConfig
}
type RouterConfig struct {
- Name string `json:"name"`
- ProtoService string `json:"service"`
- ProtoPackage string `json:"package"`
- Routes []RouteConfig `json:"routes"`
+ Name string `json:"name"`
+ ProtoService string `json:"service"`
+ ProtoPackage string `json:"package"`
+ Routes []RouteConfig `json:"routes"`
}
type RouteConfig struct {
- Name string `json:"name"`
- Type string `json:"type"`
- ProtoFile string `json:"proto_descriptor"`
- Association string `json:"association"`
- RouteField string `json:"routing_field"`
- Methods []string `json:"methods"` // The GRPC methods to route using the route field
- NbBindingMethods []string `json:"nb_binding_methods"`
- BackendCluster string `json:"backend_cluster"`
- Binding BindingConfig `json:"binding"`
- Overrides []OverrideConfig `json:"overrides"`
- backendCluster *BackendClusterConfig
+ Name string `json:"name"`
+ Type string `json:"type"`
+ ProtoFile string `json:"proto_descriptor"`
+ Association string `json:"association"`
+ RouteField string `json:"routing_field"`
+ Methods []string `json:"methods"` // The GRPC methods to route using the route field
+ NbBindingMethods []string `json:"nb_binding_methods"`
+ BackendCluster string `json:"backend_cluster"`
+ Binding BindingConfig `json:"binding"`
+ Overrides []OverrideConfig `json:"overrides"`
+ backendCluster *BackendClusterConfig
}
type BindingConfig struct {
- Type string `json:"type"`
- Field string `json:"field"`
- Method string `json:"method"`
+ Type string `json:"type"`
+ Field string `json:"field"`
+ Method string `json:"method"`
Association string `json:"association"`
-
}
type OverrideConfig struct {
- Methods []string `json:"methods"`
- Method string `json:"method"`
- RouteField string `json:"routing_field"`
+ Methods []string `json:"methods"`
+ Method string `json:"method"`
+ RouteField string `json:"routing_field"`
}
// Backend configuration
type BackendClusterConfig struct {
- Name string `json:"name"`
+ Name string `json:"name"`
Backends []BackendConfig `json:"backends"`
}
type BackendConfig struct {
- Name string `json:"name"`
- Type string `json:"type"`
- Association AssociationConfig `json:"association"`
+ Name string `json:"name"`
+ Type string `json:"type"`
+ Association AssociationConfig `json:"association"`
Connections []ConnectionConfig `json:"connections"`
}
type AssociationConfig struct {
Strategy string `json:"strategy"`
Location string `json:"location"`
- Field string `json:"field"`
- Key string `json:"key"`
+ Field string `json:"field"`
+ Key string `json:"key"`
}
type ConnectionConfig struct {
@@ -125,51 +124,51 @@
// Server configuration
type ServerConfig struct {
- Name string `json:"name"`
- Port uint `json:"port"`
- Addr string `json:"address"`
- Type string `json:"type"`
+ Name string `json:"name"`
+ Port uint `json:"port"`
+ Addr string `json:"address"`
+ Type string `json:"type"`
Routers []RouterPackage `json:"routers"`
routers map[string]*RouterConfig
}
type RouterPackage struct {
- Router string `json:"router"`
+ Router string `json:"router"`
Package string `json:"package"`
}
// Port configuration
type PortConfig struct {
- GrpcPort uint `json:"grpcPort"`
- StreamingGrpcPort uint `json:"streamingGrpcPort"`
- TlsGrpcPort uint `json:"tlsGrpcPort"`
+ GrpcPort uint `json:"grpcPort"`
+ StreamingGrpcPort uint `json:"streamingGrpcPort"`
+ TlsGrpcPort uint `json:"tlsGrpcPort"`
TlsStreamingGrpcPort uint `json:"tlsStreamingGrpcPort"`
- ControlPort uint `json:"controlPort"`
+ ControlPort uint `json:"controlPort"`
}
// Server Certificate configuration
type ServerCertConfig struct {
- GrpcCert string `json:"grpcCertificate"` // File path to the certificate file
- GrpcKey string `json:"grpcKey"` // File path to the key file
- GrpcCsr string `json:"grpcCsr"` // File path to the CSR file
+ GrpcCert string `json:"grpcCertificate"` // File path to the certificate file
+ GrpcKey string `json:"grpcKey"` // File path to the key file
+ GrpcCsr string `json:"grpcCsr"` // File path to the CSR file
}
// Client Certificate configuration
type ClientCertConfig struct {
- GrpcCert string `json:"grpcCertificate"` // File path to the certificate file
- GrpcKey string `json:"grpcKey"` // File path to the key file
- GrpcCsr string `json:"grpcCsr"` // File path to the CSR file
+ GrpcCert string `json:"grpcCertificate"` // File path to the certificate file
+ GrpcKey string `json:"grpcKey"` // File path to the key file
+ GrpcCsr string `json:"grpcCsr"` // File path to the CSR file
}
// Api configuration
type ApiConfig struct {
Addr string `json:"address"`
- Port uint `json:"port"`
+ Port uint `json:"port"`
}
-func (conf * Configuration) LoadConfig() error {
+func (conf *Configuration) LoadConfig() error {
- configF, err := os.Open(*conf.ConfigFile);
+ configF, err := os.Open(*conf.ConfigFile)
log.Info("Loading configuration from: ", *conf.ConfigFile)
if err != nil {
log.Error(err)
@@ -195,25 +194,25 @@
// references to backend_cluster in the routers.
// Resolve router references for the servers
- log.Debug("Resolving references in the config file");
- for k,_ := range conf.Servers {
+ log.Debug("Resolving references in the config file")
+ for k, _ := range conf.Servers {
//s.routers =make(map[string]*RouterConfig)
conf.Servers[k].routers = make(map[string]*RouterConfig)
- for _,rPkg := range conf.Servers[k].Routers {
+ for _, rPkg := range conf.Servers[k].Routers {
var found bool = false
// Locate the router "r" in the top lever Routers array
- log.Debugf("Resolving router reference to router '%s' from server '%s'",rPkg.Router, conf.Servers[k].Name)
+ log.Debugf("Resolving router reference to router '%s' from server '%s'", rPkg.Router, conf.Servers[k].Name)
for rk, _ := range conf.Routers {
if conf.Routers[rk].Name == rPkg.Router && !found {
log.Debugf("Reference to router '%s' found for package '%s'", rPkg.Router, rPkg.Package)
conf.Servers[k].routers[rPkg.Package] = &conf.Routers[rk]
found = true
} else if conf.Routers[rk].Name == rPkg.Router && found {
- if _,ok := conf.Servers[k].routers[rPkg.Package]; !ok {
+ if _, ok := conf.Servers[k].routers[rPkg.Package]; !ok {
log.Debugf("Reference to router '%s' found for package '%s'", rPkg.Router, rPkg.Package)
conf.Servers[k].routers[rPkg.Package] = &conf.Routers[rk]
} else {
- err := errors.New(fmt.Sprintf("Duplicate router '%s' defined for package '%s'",rPkg.Router, rPkg.Package))
+ err := errors.New(fmt.Sprintf("Duplicate router '%s' defined for package '%s'", rPkg.Router, rPkg.Package))
log.Error(err)
return err
}
@@ -228,30 +227,29 @@
}
// Resolve backend references for the routers
- for rk,rv := range conf.Routers {
- for rtk,rtv := range rv.Routes {
+ for rk, rv := range conf.Routers {
+ for rtk, rtv := range rv.Routes {
var found bool = false
- log.Debugf("Resolving backend reference to %s from router %s",rtv.BackendCluster, rv.Name)
- for bek,bev := range conf.BackendClusters {
+ log.Debugf("Resolving backend reference to %s from router %s", rtv.BackendCluster, rv.Name)
+ for bek, bev := range conf.BackendClusters {
log.Debugf("Checking cluster %s", conf.BackendClusters[bek].Name)
if rtv.BackendCluster == bev.Name && !found {
conf.Routers[rk].Routes[rtk].backendCluster = &conf.BackendClusters[bek]
found = true
} else if rtv.BackendCluster == bev.Name && found {
- err := errors.New(fmt.Sprintf("Duplicate backend defined, %s",conf.BackendClusters[bek].Name))
+ err := errors.New(fmt.Sprintf("Duplicate backend defined, %s", conf.BackendClusters[bek].Name))
log.Error(err)
return err
}
}
- if !found {
+ if !found {
err := errors.New(fmt.Sprintf("Backend %s for router %s not found in config",
- rtv.BackendCluster, rv.Name))
+ rtv.BackendCluster, rv.Name))
log.Error(err)
return err
}
}
}
-
return nil
}
diff --git a/afrouter/afrouter/helpers.go b/afrouter/afrouter/helpers.go
index 4d7362b..441b4b9 100644
--- a/afrouter/afrouter/helpers.go
+++ b/afrouter/afrouter/helpers.go
@@ -20,7 +20,7 @@
//import "github.com/opencord/voltha-go/common/log"
func strIndex(ar []string, match string) int {
- for idx,v := range ar {
+ for idx, v := range ar {
if v == match {
return idx
}
diff --git a/afrouter/afrouter/method-router.go b/afrouter/afrouter/method-router.go
index 379bf11..5e11fa6 100644
--- a/afrouter/afrouter/method-router.go
+++ b/afrouter/afrouter/method-router.go
@@ -18,38 +18,38 @@
package afrouter
import (
- "fmt"
"errors"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
- "github.com/opencord/voltha-go/common/log"
)
const NoMeta = "nometa"
type MethodRouter struct {
- name string
+ name string
service string
- mthdRt map[string]map[string]Router // map of [metadata][method]
+ mthdRt map[string]map[string]Router // map of [metadata][method]
}
func newMethodRouter(config *RouterConfig) (Router, error) {
- mr := MethodRouter{name:config.Name,service:config.ProtoService,mthdRt:make(map[string]map[string]Router)}
+ mr := MethodRouter{name: config.Name, service: config.ProtoService, mthdRt: make(map[string]map[string]Router)}
mr.mthdRt[NoMeta] = make(map[string]Router) // For routes not needing metadata (all expcept binding at this time)
log.Debugf("Processing MethodRouter config %v", *config)
if len(config.Routes) == 0 {
return nil, errors.New(fmt.Sprintf("Router %s must have at least one route", config.Name))
}
- for _,rtv := range config.Routes {
+ for _, rtv := range config.Routes {
//log.Debugf("Processing route: %v",rtv)
var idx1 string
- r,err := newSubRouter(config, &rtv)
+ r, err := newSubRouter(config, &rtv)
if err != nil {
return nil, err
}
if rtv.Type == "binding" {
idx1 = rtv.Binding.Field
- if _,ok := mr.mthdRt[idx1]; ok == false { // /First attempt on this key
+ if _, ok := mr.mthdRt[idx1]; ok == false { // /First attempt on this key
mr.mthdRt[idx1] = make(map[string]Router)
}
} else {
@@ -62,8 +62,8 @@
if rtv.Methods[0] == "*" {
return r, nil
} else {
- log.Debugf("Setting router '%s' for single method '%s'",r.Name(),rtv.Methods[0])
- if _,ok := mr.mthdRt[idx1][rtv.Methods[0]]; ok == false {
+ log.Debugf("Setting router '%s' for single method '%s'", r.Name(), rtv.Methods[0])
+ if _, ok := mr.mthdRt[idx1][rtv.Methods[0]]; ok == false {
mr.mthdRt[idx1][rtv.Methods[0]] = r
} else {
err := errors.New(fmt.Sprintf("Attempt to define method %s for 2 routes: %s & %s", rtv.Methods[0],
@@ -73,10 +73,10 @@
}
}
default:
- for _,m := range rtv.Methods {
+ for _, m := range rtv.Methods {
log.Debugf("Processing Method %s", m)
- if _,ok := mr.mthdRt[idx1][m]; ok == false {
- log.Debugf("Setting router '%s' for method '%s'",r.Name(),m)
+ if _, ok := mr.mthdRt[idx1][m]; ok == false {
+ log.Debugf("Setting router '%s' for method '%s'", r.Name(), m)
mr.mthdRt[idx1][m] = r
} else {
err := errors.New(fmt.Sprintf("Attempt to define method %s for 2 routes: %s & %s", m, r.Name(), mr.mthdRt[idx1][m].Name()))
@@ -98,69 +98,68 @@
return mr.service
}
-func (mr MethodRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error) {
+func (mr MethodRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
var rtrnK string = NoMeta
var rtrnV string = ""
// Get the metadata from the server stream
- md, ok := metadata.FromIncomingContext(serverStream.Context())
+ md, ok := metadata.FromIncomingContext(serverStream.Context())
if !ok {
- return rtrnK, rtrnV, errors.New("Could not get a server stream metadata")
- }
+ return rtrnK, rtrnV, errors.New("Could not get a server stream metadata")
+ }
// Determine if one of the method routing keys exists in the metadata
- for k,_ := range mr.mthdRt {
- if _,ok := md[k]; ok == true {
+ for k, _ := range mr.mthdRt {
+ if _, ok := md[k]; ok == true {
rtrnV = md[k][0]
rtrnK = k
break
}
}
- return rtrnK,rtrnV,nil
+ return rtrnK, rtrnV, nil
}
func (mr MethodRouter) ReplyHandler(sel interface{}) error {
switch sl := sel.(type) {
- case *sbFrame:
- if r,ok := mr.mthdRt[NoMeta][sl.method]; ok == true {
- return r.ReplyHandler(sel)
- }
- // TODO: this case should also be an error
- default: //TODO: This should really be a big error
- // A reply handler should only be called on the sbFrame
- return nil
+ case *sbFrame:
+ if r, ok := mr.mthdRt[NoMeta][sl.method]; ok == true {
+ return r.ReplyHandler(sel)
+ }
+ // TODO: this case should also be an error
+ default: //TODO: This should really be a big error
+ // A reply handler should only be called on the sbFrame
+ return nil
}
return nil
}
func (mr MethodRouter) Route(sel interface{}) *backend {
switch sl := sel.(type) {
- case *nbFrame:
- if r,ok := mr.mthdRt[sl.metaKey][sl.mthdSlice[REQ_METHOD]]; ok == true {
- return r.Route(sel)
- }
- log.Errorf("Attept to route on non-existent method '%s'", sl.mthdSlice[REQ_METHOD])
- return nil
- default:
- return nil
+ case *nbFrame:
+ if r, ok := mr.mthdRt[sl.metaKey][sl.mthdSlice[REQ_METHOD]]; ok == true {
+ return r.Route(sel)
+ }
+ log.Errorf("Attept to route on non-existent method '%s'", sl.mthdSlice[REQ_METHOD])
+ return nil
+ default:
+ return nil
}
- return nil
}
-func (mr MethodRouter) BackendCluster(mthd string, metaKey string) (*backendCluster,error) {
- if r,ok := mr.mthdRt[metaKey][mthd]; ok == true {
+func (mr MethodRouter) BackendCluster(mthd string, metaKey string) (*backendCluster, error) {
+ if r, ok := mr.mthdRt[metaKey][mthd]; ok == true {
return r.BackendCluster(mthd, metaKey)
}
- err := errors.New(fmt.Sprintf("No backend cluster exists for method '%s' using meta key '%s'", mthd,metaKey))
+ err := errors.New(fmt.Sprintf("No backend cluster exists for method '%s' using meta key '%s'", mthd, metaKey))
log.Error(err)
return nil, err
}
func (mr MethodRouter) FindBackendCluster(beName string) *backendCluster {
- for _,meta := range mr.mthdRt {
- for _,r := range meta {
- if rtrn := r.FindBackendCluster(beName); rtrn != nil {
+ for _, meta := range mr.mthdRt {
+ for _, r := range meta {
+ if rtrn := r.FindBackendCluster(beName); rtrn != nil {
return rtrn
}
}
diff --git a/afrouter/afrouter/round-robin-router.go b/afrouter/afrouter/round-robin-router.go
index 8201541..65d883a 100644
--- a/afrouter/afrouter/round-robin-router.go
+++ b/afrouter/afrouter/round-robin-router.go
@@ -18,18 +18,18 @@
package afrouter
import (
- "fmt"
"errors"
- "google.golang.org/grpc"
+ "fmt"
"github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
)
type RoundRobinRouter struct {
- name string
- routerType int // TODO: Likely not needed.
+ name string
+ routerType int // TODO: Likely not needed.
grpcService string
- bkndClstr *backendCluster
- curBknd **backend
+ bkndClstr *backendCluster
+ curBknd **backend
}
func newRoundRobinRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
@@ -57,9 +57,9 @@
var bptr *backend
bptr = nil
rr := RoundRobinRouter{
- name:config.Name,
- grpcService:rconf.ProtoService,
- curBknd:&bptr,
+ name: config.Name,
+ grpcService: rconf.ProtoService,
+ curBknd: &bptr,
}
// This has already been validated bfore this function
@@ -71,7 +71,7 @@
}
}
- // Create the backend cluster or link to an existing one
+ // Create the backend cluster or link to an existing one
ok := true
if rr.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
if rr.bkndClstr, err = newBackendCluster(config.backendCluster); err != nil {
@@ -81,49 +81,47 @@
}
if rtrn_err {
- return rr,errors.New(fmt.Sprintf("Failed to create a new router '%s'",rr.name))
+ return rr, errors.New(fmt.Sprintf("Failed to create a new router '%s'", rr.name))
}
- return rr,nil
+ return rr, nil
}
-func (rr RoundRobinRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error) {
- return "","",nil
+func (rr RoundRobinRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
+ return "", "", nil
}
-func (rr RoundRobinRouter) BackendCluster(s string, mk string) (*backendCluster,error) {
+func (rr RoundRobinRouter) BackendCluster(s string, mk string) (*backendCluster, error) {
return rr.bkndClstr, nil
}
-func (rr RoundRobinRouter) Name() (string) {
+func (rr RoundRobinRouter) Name() string {
return rr.name
}
-func(rr RoundRobinRouter) Route(sel interface{}) (*backend) {
+func (rr RoundRobinRouter) Route(sel interface{}) *backend {
var err error
switch sl := sel.(type) {
- case *nbFrame:
- // Since this is a round robin router just get the next backend
- if *rr.curBknd, err = rr.bkndClstr.nextBackend(*rr.curBknd,BE_SEQ_RR); err == nil {
- return *rr.curBknd
- } else {
- sl.err = err
- return nil
- }
- default:
- log.Errorf("Internal: invalid data type in Route call %v", sel);
+ case *nbFrame:
+ // Since this is a round robin router just get the next backend
+ if *rr.curBknd, err = rr.bkndClstr.nextBackend(*rr.curBknd, BE_SEQ_RR); err == nil {
+ return *rr.curBknd
+ } else {
+ sl.err = err
return nil
+ }
+ default:
+ log.Errorf("Internal: invalid data type in Route call %v", sel)
+ return nil
}
- log.Errorf("Round robin error %v", err);
- return nil
}
-func (rr RoundRobinRouter) Service() (string) {
+func (rr RoundRobinRouter) Service() string {
return rr.grpcService
}
-func (rr RoundRobinRouter) FindBackendCluster(becName string) (*backendCluster) {
- if becName == rr.bkndClstr.name {
+func (rr RoundRobinRouter) FindBackendCluster(becName string) *backendCluster {
+ if becName == rr.bkndClstr.name {
return rr.bkndClstr
}
return nil
diff --git a/afrouter/afrouter/router.go b/afrouter/afrouter/router.go
index f1e729f..f71bd30 100644
--- a/afrouter/afrouter/router.go
+++ b/afrouter/afrouter/router.go
@@ -18,38 +18,38 @@
package afrouter
import (
- "fmt"
"errors"
+ "fmt"
"google.golang.org/grpc"
)
const (
- RT_RPC_AFFINITY_MESSAGE = iota+1
- RT_RPC_AFFINITY_HEADER = iota+1
- RT_BINDING = iota+1
- RT_ROUND_ROBIN = iota+1
+ RT_RPC_AFFINITY_MESSAGE = iota + 1
+ RT_RPC_AFFINITY_HEADER = iota + 1
+ RT_BINDING = iota + 1
+ RT_ROUND_ROBIN = iota + 1
)
// String names for display in error messages.
-var rTypeNames = []string{"","rpc_affinity_message","rpc_affinity_header","binding", "round_robin"}
-var rAssnNames = []string{"","round_robin"}
+var rTypeNames = []string{"", "rpc_affinity_message", "rpc_affinity_header", "binding", "round_robin"}
+var rAssnNames = []string{"", "round_robin"}
var allRouters map[string]Router = make(map[string]Router)
// The router interface
type Router interface {
- Name() (string)
+ Name() string
Route(interface{}) *backend
- Service() (string)
+ Service() string
BackendCluster(string, string) (*backendCluster, error)
- FindBackendCluster(string) (*backendCluster)
+ FindBackendCluster(string) *backendCluster
ReplyHandler(interface{}) error
- GetMetaKeyVal(serverStream grpc.ServerStream) (string,string,error)
+ GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error)
}
func newRouter(config *RouterConfig) (Router, error) {
- r,err := newMethodRouter(config)
- if err == nil {
+ r, err := newMethodRouter(config)
+ if err == nil {
allRouters[r.Name()] = r
}
return r, err
@@ -59,19 +59,19 @@
idx := strIndex(rTypeNames, config.Type)
switch idx {
case RT_RPC_AFFINITY_MESSAGE:
- r,err := newAffinityRouter(rconf, config)
+ r, err := newAffinityRouter(rconf, config)
if err == nil {
allRouters[rconf.Name+config.Name] = r
}
return r, err
case RT_BINDING:
- r,err := newBindingRouter(rconf, config)
+ r, err := newBindingRouter(rconf, config)
if err == nil {
allRouters[rconf.Name+config.Name] = r
}
return r, err
case RT_ROUND_ROBIN:
- r,err := newRoundRobinRouter(rconf, config)
+ r, err := newRoundRobinRouter(rconf, config)
if err == nil {
allRouters[rconf.Name+config.Name] = r
}
@@ -79,6 +79,4 @@
default:
return nil, errors.New(fmt.Sprintf("Internal error, undefined router type: %s", config.Type))
}
-
- return nil, errors.New(fmt.Sprintf("Unrecognized router type '%s'",config.Type))
}
diff --git a/afrouter/afrouter/server.go b/afrouter/afrouter/server.go
index 17c3b4f..0691da1 100644
--- a/afrouter/afrouter/server.go
+++ b/afrouter/afrouter/server.go
@@ -18,14 +18,14 @@
package afrouter
import (
- "fmt"
- "net"
- "regexp"
"errors"
- "strconv"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
- "github.com/opencord/voltha-go/common/log"
+ "net"
+ "regexp"
+ "strconv"
)
var (
@@ -34,36 +34,36 @@
ClientStreams: true,
}
)
+
const (
- REQ_ALL = 0
+ REQ_ALL = 0
REQ_PACKAGE = 1
REQ_SERVICE = 2
- REQ_METHOD = 3
+ REQ_METHOD = 3
)
type server struct {
- running bool
- name string
- stype nbi
+ running bool
+ name string
+ stype nbi
proxyListener net.Listener
- routers map[string]map[string]Router
- proxyServer *grpc.Server
+ routers map[string]map[string]Router
+ proxyServer *grpc.Server
}
type nbRequest struct {
- srv interface{}
+ srv interface{}
serverStream grpc.ServerStream
- r Router
- mthdSlice []string
- metaKey string // There should be at most one key specified. More than one is an error.
- metaVal string // This is the value extracted from the meta key if it exists or "" otherwise
+ r Router
+ mthdSlice []string
+ metaKey string // There should be at most one key specified. More than one is an error.
+ metaVal string // This is the value extracted from the meta key if it exists or "" otherwise
}
var mthdSlicerExp string = `^/([a-zA-Z][a-zA-Z0-9]+)\.([a-zA-Z][a-zA-Z0-9]+)/([a-zA-Z][a-zA-Z0-9]+)`
var mthdSlicer *regexp.Regexp // The compiled regex to extract the package/service/method
-
-func newServer(config *ServerConfig) (*server,error) {
+func newServer(config *ServerConfig) (*server, error) {
var err error = nil
var rtrn_err bool = false
var srvr *server
@@ -71,7 +71,7 @@
// Validate the configuration
// There should be a name
if config.Name == "" {
- log.Error("A server has been defined with no name")
+ log.Error("A server has been defined with no name")
rtrn_err = true
}
// Validate that there's a port specified
@@ -86,15 +86,15 @@
}
if config.Type != "grpc" && config.Type != "streaming_grpc" {
if config.Type == "" {
- log.Errorf("A server 'type' must be defined for server %s",config.Name)
+ log.Errorf("A server 'type' must be defined for server %s", config.Name)
} else {
- log.Errorf("The server type must be either 'grpc' or 'streaming_grpc' "+
- "but '%s' was found for server '%s'", config.Type, config.Name)
+ log.Errorf("The server type must be either 'grpc' or 'streaming_grpc' "+
+ "but '%s' was found for server '%s'", config.Type, config.Name)
}
rtrn_err = true
}
if len(config.Routers) == 0 {
- log.Errorf("At least one router must be specified for server '%s'", config.Name)
+ log.Errorf("At least one router must be specified for server '%s'", config.Name)
rtrn_err = true
}
@@ -102,25 +102,25 @@
return nil, errors.New("Server configuration failed")
} else {
// The configuration is valid, create a server and configure it.
- srvr = &server{name:config.Name,routers:make(map[string]map[string]Router)}
+ srvr = &server{name: config.Name, routers: make(map[string]map[string]Router)}
// The listener
if srvr.proxyListener, err =
- net.Listen("tcp", config.Addr + ":"+
- strconv.Itoa(int(config.Port))); err != nil {
+ net.Listen("tcp", config.Addr+":"+
+ strconv.Itoa(int(config.Port))); err != nil {
log.Error(err)
return nil, err
}
// Create the routers
log.Debugf("Configuring the routers for server %s", srvr.name)
- for p,r := range config.routers {
- log.Debugf("Processing router %s for package %s", r.Name,p)
- if dr,err := newRouter(r); err != nil {
- log.Error(err)
- return nil, err
- } else {
+ for p, r := range config.routers {
+ log.Debugf("Processing router %s for package %s", r.Name, p)
+ if dr, err := newRouter(r); err != nil {
+ log.Error(err)
+ return nil, err
+ } else {
log.Debugf("Adding router %s to the server %s for package %s and service %s",
- dr.Name(), srvr.name, p, dr.Service())
- if _,ok := srvr.routers[p]; ok {
+ dr.Name(), srvr.name, p, dr.Service())
+ if _, ok := srvr.routers[p]; ok {
srvr.routers[p][dr.Service()] = dr
} else {
srvr.routers[p] = make(map[string]Router)
@@ -140,7 +140,7 @@
return srvr, nil
}
-func (s *server) Name() (string) {
+func (s *server) Name() string {
return s.name
}
@@ -148,8 +148,7 @@
return s.handler
}
-
-func (s *server) getRouter(pkg *string, service *string) (Router,bool) {
+func (s *server) getRouter(pkg *string, service *string) (Router, bool) {
if fn, ok := s.routers[*pkg][*service]; ok { // Both specified
return fn, ok
} else if fn, ok = s.routers["*"][*service]; ok { // Package wild card
@@ -159,18 +158,17 @@
} else if fn, ok = s.routers["*"]["*"]; ok { // Both Wildcarded
return fn, ok
} else {
- return nil,false
+ return nil, false
}
}
-
func (s *server) handler(srv interface{}, serverStream grpc.ServerStream) error {
// Determine what router is intended to handle this request
fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
if !ok {
return grpc.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
}
- log.Debugf("Processing grpc request %s on server %s",fullMethodName,s.name)
+ log.Debugf("Processing grpc request %s on server %s", fullMethodName, s.name)
// The full method name is structured as follows:
// <package name>.<service>/<method>
mthdSlice := mthdSlicer.FindStringSubmatch(fullMethodName)
@@ -179,7 +177,7 @@
} else {
log.Debugf("Sliced full method %s: %v", fullMethodName, mthdSlice)
}
- r, ok := s.getRouter(&mthdSlice[REQ_PACKAGE],&mthdSlice[REQ_SERVICE])
+ r, ok := s.getRouter(&mthdSlice[REQ_PACKAGE], &mthdSlice[REQ_SERVICE])
//fn, ok := s.routers[mthdSlice[REQ_PACKAGE]][mthdSlice[REQ_SERVICE]]
if !ok {
// TODO: Should this be punted to a default transparent router??
@@ -193,7 +191,7 @@
}
log.Debugf("Selected router %s\n", r.Name())
- mk,mv,err := r.GetMetaKeyVal(serverStream)
+ mk, mv, err := r.GetMetaKeyVal(serverStream)
if err != nil {
log.Error(err)
return err
@@ -202,13 +200,10 @@
//nbR := &nbRequest(srv:srv,serverStream:serverStream,r:r,mthdSlice:mthdSlice,metaKey:mk,metaVal:mv)
// Extract the cluster from the selected router and use it to manage the transfer
- if bkndClstr,err := r.BackendCluster(mthdSlice[REQ_METHOD], mk); err != nil {
+ if bkndClstr, err := r.BackendCluster(mthdSlice[REQ_METHOD], mk); err != nil {
return err
} else {
//return bkndClstr.handler(nbR)
return bkndClstr.handler(srv, serverStream, r, mthdSlice, mk, mv)
}
-
- return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}
-
diff --git a/afrouter/afrouter/signals.go b/afrouter/afrouter/signals.go
index 6cc32ad..5416fda 100644
--- a/afrouter/afrouter/signals.go
+++ b/afrouter/afrouter/signals.go
@@ -22,17 +22,16 @@
package afrouter
import (
- "os"
- "syscall"
- "os/signal"
"github.com/opencord/voltha-go/common/log"
+ "os"
+ "os/signal"
+ "syscall"
)
var errChan = make(chan error)
var doneChan = make(chan error)
var holdChan = make(chan int)
-
func InitExitHandler() error {
// Start the signal handler
@@ -70,16 +69,16 @@
if arProxy != nil {
for _, srvr := range arProxy.servers {
if srvr.running {
- log.With(log.Fields{"server":srvr.name}).Debug("Closing server")
- srvr.proxyServer.GracefulStop();
- srvr.proxyListener.Close();
+ log.With(log.Fields{"server": srvr.name}).Debug("Closing server")
+ srvr.proxyServer.GracefulStop()
+ srvr.proxyListener.Close()
}
}
}
- for _,cl := range bClusters {
+ for _, cl := range bClusters {
for _, bknd := range cl.backends {
log.Debugf("Closing backend %s", bknd.name)
- for _,conn := range bknd.connections {
+ for _, conn := range bknd.connections {
log.Debugf("Closing connection %s", conn.name)
conn.close()
}
@@ -88,4 +87,3 @@
doneChan <- err
//os.Exit(0)
}
-
diff --git a/afrouter/arouter.go b/afrouter/arouter.go
index dfa9fc8..65bb7df 100644
--- a/afrouter/arouter.go
+++ b/afrouter/arouter.go
@@ -16,23 +16,22 @@
// gRPC affinity router with active/active backends
package main
+
/* package main // import "github.com/opencord/voltha-go/arouter" */
/* package main // import "github.com/opencord/voltha-go" */
-
import (
- "os"
"fmt"
- slog "log"
- "google.golang.org/grpc/grpclog"
- "github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/afrouter/afrouter"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc/grpclog"
+ slog "log"
+ "os"
)
func main() {
-
- conf,err := afrouter.ParseCmd()
+ conf, err := afrouter.ParseCmd()
if err != nil {
fmt.Printf("Error: %v\n", err)
return
@@ -51,7 +50,7 @@
log.Error(err)
return
}
- log.With(log.Fields{"config":*conf}).Debug("Configuration loaded")
+ log.With(log.Fields{"config": *conf}).Debug("Configuration loaded")
// Enable grpc logging
if *conf.GrpcLog {
@@ -62,14 +61,13 @@
// Install the signal and error handlers.
afrouter.InitExitHandler()
-
// Create the affinity router proxy...
- if ap,err := afrouter.NewArouterProxy(conf); err != nil {
- log.Errorf("Failed to create the arouter proxy, exiting:%v",err)
- return
- // and start it.
- // This function never returns unless an error
- // occurs or a signal is caught.
+ if ap, err := afrouter.NewArouterProxy(conf); err != nil {
+ log.Errorf("Failed to create the arouter proxy, exiting:%v", err)
+ return
+ // and start it.
+ // This function never returns unless an error
+ // occurs or a signal is caught.
} else if err := ap.ListenAndServe(); err != nil {
log.Errorf("Exiting on error %v", err)
}