General afrouter cleanup.
- Separated backend.go into multiple files.
- Replaced array indexing hack with enum pattern.
- Various renaming for better consistency.
- Removed a few unused structs.
- Replaced a thread with an atomic operation.
Change-Id: I2239692cac21ddb7f513b6d8c247ffa8789714ac
diff --git a/afrouter/afrouter/affinity-router.go b/afrouter/afrouter/affinity-router.go
index 45ec26a..2390846 100644
--- a/afrouter/afrouter/affinity-router.go
+++ b/afrouter/afrouter/affinity-router.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -35,23 +34,22 @@
)
type AffinityRouter struct {
- name string
- routerType int // TODO: This is probably not needed
- association int
- routingField string
- grpcService string
- protoDescriptor *pb.FileDescriptorSet
- methodMap map[string]byte
- nbBindingMthdMap map[string]byte
- bkndClstr *backendCluster
- affinity map[string]*backend
- curBknd **backend
+ name string
+ association associationType
+ routingField string
+ grpcService string
+ protoDescriptor *pb.FileDescriptorSet
+ methodMap map[string]byte
+ nbBindingMethodMap map[string]byte
+ cluster *cluster
+ affinity map[string]*backend
+ currentBackend **backend
}
func newAffinityRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
var err error = nil
- var rtrn_err bool = false
- var pkg_re *regexp.Regexp = regexp.MustCompile(`^(\.[^.]+\.)(.+)$`)
+ var rtrn_err = false
+ var pkg_re = regexp.MustCompile(`^(\.[^.]+\.)(.+)$`)
// Validate the configuration
// A name must exist
@@ -82,34 +80,20 @@
var bptr *backend
bptr = nil
dr := AffinityRouter{
- name: config.Name,
- grpcService: rconf.ProtoService,
- affinity: make(map[string]*backend),
- methodMap: make(map[string]byte),
- nbBindingMthdMap: make(map[string]byte),
- curBknd: &bptr,
- //serialNo:0,
+ name: config.Name,
+ grpcService: rconf.ProtoService,
+ affinity: make(map[string]*backend),
+ methodMap: make(map[string]byte),
+ nbBindingMethodMap: make(map[string]byte),
+ currentBackend: &bptr,
}
// An association must exist
- dr.association = strIndex(rAssnNames, config.Association)
- if dr.association == 0 {
- if config.Association == "" {
- log.Error("An association must be specified")
- } else {
- log.Errorf("The association '%s' is not valid", config.Association)
- }
+ dr.association = config.Association
+ if dr.association == AssociationUndefined {
+ log.Error("An association must be specified")
rtrn_err = true
}
- // This has already been validated bfore this function
- // is called so just use it.
- for idx := range rTypeNames {
- if config.Type == rTypeNames[idx] {
- dr.routerType = idx
- break
- }
- }
-
// Load the protobuf descriptor file
dr.protoDescriptor = &pb.FileDescriptorSet{}
fb, err := ioutil.ReadFile(config.ProtoFile)
@@ -126,10 +110,10 @@
// Build the routing structure based on the loaded protobuf
// descriptor file and the config information.
type key struct {
- mthd string
- field string
+ method string
+ field string
}
- var msgs map[key]byte = make(map[key]byte)
+ var msgs = make(map[key]byte)
for _, f := range dr.protoDescriptor.File {
// Build a temporary map of message types by name.
for _, m := range f.MessageType {
@@ -153,7 +137,7 @@
// field number and save it for future reference.
log.Debugf("Processing method '%s'", *m.Name)
// Determine if this is a method we're supposed to be processing.
- if needMethod(*m.Name, config) == true {
+ if needMethod(*m.Name, config) {
log.Debugf("Enabling method '%s'", *m.Name)
pkg_methd := pkg_re.FindStringSubmatch(*m.InputType)
if pkg_methd == nil {
@@ -164,19 +148,19 @@
//in := (*m.InputType)[len(rconf.ProtoPackage)+2:]
in := pkg_methd[PKG_MTHD_MTHD]
dr.methodMap[*m.Name], ok = msgs[key{in, config.RouteField}]
- if ok == false {
+ if !ok {
log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
*m.Name, config.RouteField, in)
rtrn_err = true
}
}
// The sb method is always included in the methods so we can check it here too.
- if needSbMethod(*m.Name, config) == true {
+ if needSbMethod(*m.Name, config) {
log.Debugf("Enabling southbound method '%s'", *m.Name)
// The output type has the package name prepended to it. Remove it.
out := (*m.OutputType)[len(rconf.ProtoPackage)+2:]
- dr.nbBindingMthdMap[*m.Name], ok = msgs[key{out, config.RouteField}]
- if ok == false {
+ dr.nbBindingMethodMap[*m.Name], ok = msgs[key{out, config.RouteField}]
+ if !ok {
log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
*m.Name, config.RouteField, out)
rtrn_err = true
@@ -190,8 +174,8 @@
// Create the backend cluster or link to an existing one
ok := true
- if dr.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
- if dr.bkndClstr, err = newBackendCluster(config.backendCluster); err != nil {
+ if dr.cluster, ok = clusters[config.backendCluster.Name]; !ok {
+ if dr.cluster, err = newBackendCluster(config.backendCluster); err != nil {
log.Errorf("Could not create a backend for router %s", config.Name)
rtrn_err = true
}
@@ -222,37 +206,37 @@
return false
}
-func (r AffinityRouter) Service() string {
- return r.grpcService
+func (ar AffinityRouter) Service() string {
+ return ar.grpcService
}
-func (r AffinityRouter) Name() string {
- return r.name
+func (ar AffinityRouter) Name() string {
+ return ar.name
}
-func (r AffinityRouter) skipField(data *[]byte, idx *int) error {
+func (ar AffinityRouter) skipField(data *[]byte, idx *int) error {
switch (*data)[*idx] & 3 {
case 0: // Varint
- (*idx)++
+ *idx++
for (*data)[*idx] >= 128 {
- (*idx)++
+ *idx++
}
case 1: // 64 bit
- (*idx) += 9
+ *idx += 9
case 2: // Length delimited
- (*idx)++
+ *idx++
b := proto.NewBuffer((*data)[*idx:])
t, _ := b.DecodeVarint()
- (*idx) += int(t) + 1
+ *idx += int(t) + 1
case 3: // Deprecated
case 4: // Deprecated
case 5: // 32 bit
- (*idx) += 5
+ *idx += 5
}
return nil
}
-func (r AffinityRouter) decodeProtoField(payload []byte, fieldId byte) (string, error) {
+func (ar AffinityRouter) decodeProtoField(payload []byte, fieldId byte) (string, error) {
idx := 0
b := proto.NewBuffer([]byte{})
//b.DebugPrint("The Buffer", payload)
@@ -291,44 +275,44 @@
return "", err
}
return selector, nil
- } else if err := r.skipField(&payload, &idx); err != nil {
+ } else if err := ar.skipField(&payload, &idx); err != nil {
log.Errorf("Parsing message failed %v", err)
return "", err
}
}
}
-func (r AffinityRouter) Route(sel interface{}) *backend {
+func (ar AffinityRouter) Route(sel interface{}) *backend {
switch sl := sel.(type) {
case *nbFrame:
- log.Debugf("Route called for nbFrame with method %s", sl.mthdSlice[REQ_METHOD])
+ log.Debugf("Route called for nbFrame with method %s", sl.methodInfo.method)
// Check if this method should be affinity bound from the
// reply rather than the request.
- if _, ok := r.nbBindingMthdMap[sl.mthdSlice[REQ_METHOD]]; ok == true {
+ if _, ok := ar.nbBindingMethodMap[sl.methodInfo.method]; ok {
var err error
- log.Debugf("Method '%s' affinity binds on reply", sl.mthdSlice[REQ_METHOD])
+ log.Debugf("Method '%s' affinity binds on reply", sl.methodInfo.method)
// Just round robin route the southbound request
- if *r.curBknd, err = r.bkndClstr.nextBackend(*r.curBknd, BE_SEQ_RR); err == nil {
- return *r.curBknd
+ if *ar.currentBackend, err = ar.cluster.nextBackend(*ar.currentBackend, BackendSequenceRoundRobin); err == nil {
+ return *ar.currentBackend
} else {
sl.err = err
return nil
}
}
// Not a south affinity binding method, proceed with north affinity binding.
- if selector, err := r.decodeProtoField(sl.payload, r.methodMap[sl.mthdSlice[REQ_METHOD]]); err == nil {
+ if selector, err := ar.decodeProtoField(sl.payload, ar.methodMap[sl.methodInfo.method]); err == nil {
log.Debugf("Establishing affinity for selector: %s", selector)
- if rtrn, ok := r.affinity[selector]; ok {
+ if rtrn, ok := ar.affinity[selector]; ok {
return rtrn
} else {
// The selector isn't in the map, create a new affinity mapping
log.Debugf("MUST CREATE A NEW AFFINITY MAP ENTRY!!")
var err error
- if *r.curBknd, err = r.bkndClstr.nextBackend(*r.curBknd, BE_SEQ_RR); err == nil {
- r.setAffinity(selector, *r.curBknd)
- //r.affinity[selector] = *r.curBknd
- //log.Debugf("New affinity set to backend %s",(*r.curBknd).name)
- return *r.curBknd
+ if *ar.currentBackend, err = ar.cluster.nextBackend(*ar.currentBackend, BackendSequenceRoundRobin); err == nil {
+ ar.setAffinity(selector, *ar.currentBackend)
+ //ar.affinity[selector] = *ar.currentBackend
+ //log.Debugf("New affinity set to backend %s",(*ar.currentBackend).name)
+ return *ar.currentBackend
} else {
sl.err = err
return nil
@@ -339,7 +323,7 @@
log.Errorf("Internal: invalid data type in Route call %v", sel)
return nil
}
- log.Errorf("Bad lookup in affinity map %v", r.affinity)
+ log.Errorf("Bad lookup in affinity map %v", ar.affinity)
return nil
}
@@ -347,30 +331,30 @@
return "", "", nil
}
-func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*backendCluster, error) {
- return ar.bkndClstr, nil
+func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
+ return ar.cluster, nil
}
-func (ar AffinityRouter) FindBackendCluster(beName string) *backendCluster {
- if beName == ar.bkndClstr.name {
- return ar.bkndClstr
+func (ar AffinityRouter) FindBackendCluster(beName string) *cluster {
+ if beName == ar.cluster.name {
+ return ar.cluster
}
return nil
}
-func (r AffinityRouter) ReplyHandler(sel interface{}) error {
+func (ar AffinityRouter) ReplyHandler(sel interface{}) error {
switch sl := sel.(type) {
case *sbFrame:
- sl.lck.Lock()
- defer sl.lck.Unlock()
+ sl.mutex.Lock()
+ defer sl.mutex.Unlock()
log.Debugf("Reply handler called for sbFrame with method %s", sl.method)
// Determine if reply action is required.
- if fld, ok := r.nbBindingMthdMap[sl.method]; ok == true && len(sl.payload) > 0 {
+ if fld, ok := ar.nbBindingMethodMap[sl.method]; ok && len(sl.payload) > 0 {
// Extract the field value from the frame and
// and set affinity accordingly
- if selector, err := r.decodeProtoField(sl.payload, fld); err == nil {
+ if selector, err := ar.decodeProtoField(sl.payload, fld); err == nil {
log.Debug("Settign affinity on reply")
- if r.setAffinity(selector, sl.be) != nil {
+ if ar.setAffinity(selector, sl.backend) != nil {
log.Error("Setting affinity on reply failed")
}
return nil
@@ -389,7 +373,7 @@
}
func (ar AffinityRouter) setAffinity(key string, be *backend) error {
- if be2, ok := ar.affinity[key]; ok == false {
+ if be2, ok := ar.affinity[key]; !ok {
ar.affinity[key] = be
log.Debugf("New affinity set to backend %s for key %s", be.name, key)
} else if be2 != be {