General afrouter cleanup.
- Separated backend.go into multiple files.
- Replaced array indexing hack with enum pattern.
- Various renaming for better consistency.
- Removed a few unused structs.
- Replaced a thread with an atomic operation.
Change-Id: I2239692cac21ddb7f513b6d8c247ffa8789714ac
diff --git a/afrouter/afrouter/affinity-router.go b/afrouter/afrouter/affinity-router.go
index 45ec26a..2390846 100644
--- a/afrouter/afrouter/affinity-router.go
+++ b/afrouter/afrouter/affinity-router.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -35,23 +34,22 @@
)
type AffinityRouter struct {
- name string
- routerType int // TODO: This is probably not needed
- association int
- routingField string
- grpcService string
- protoDescriptor *pb.FileDescriptorSet
- methodMap map[string]byte
- nbBindingMthdMap map[string]byte
- bkndClstr *backendCluster
- affinity map[string]*backend
- curBknd **backend
+ name string
+ association associationType
+ routingField string
+ grpcService string
+ protoDescriptor *pb.FileDescriptorSet
+ methodMap map[string]byte
+ nbBindingMethodMap map[string]byte
+ cluster *cluster
+ affinity map[string]*backend
+ currentBackend **backend
}
func newAffinityRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
var err error = nil
- var rtrn_err bool = false
- var pkg_re *regexp.Regexp = regexp.MustCompile(`^(\.[^.]+\.)(.+)$`)
+ var rtrn_err = false
+ var pkg_re = regexp.MustCompile(`^(\.[^.]+\.)(.+)$`)
// Validate the configuration
// A name must exist
@@ -82,34 +80,20 @@
var bptr *backend
bptr = nil
dr := AffinityRouter{
- name: config.Name,
- grpcService: rconf.ProtoService,
- affinity: make(map[string]*backend),
- methodMap: make(map[string]byte),
- nbBindingMthdMap: make(map[string]byte),
- curBknd: &bptr,
- //serialNo:0,
+ name: config.Name,
+ grpcService: rconf.ProtoService,
+ affinity: make(map[string]*backend),
+ methodMap: make(map[string]byte),
+ nbBindingMethodMap: make(map[string]byte),
+ currentBackend: &bptr,
}
// An association must exist
- dr.association = strIndex(rAssnNames, config.Association)
- if dr.association == 0 {
- if config.Association == "" {
- log.Error("An association must be specified")
- } else {
- log.Errorf("The association '%s' is not valid", config.Association)
- }
+ dr.association = config.Association
+ if dr.association == AssociationUndefined {
+ log.Error("An association must be specified")
rtrn_err = true
}
- // This has already been validated bfore this function
- // is called so just use it.
- for idx := range rTypeNames {
- if config.Type == rTypeNames[idx] {
- dr.routerType = idx
- break
- }
- }
-
// Load the protobuf descriptor file
dr.protoDescriptor = &pb.FileDescriptorSet{}
fb, err := ioutil.ReadFile(config.ProtoFile)
@@ -126,10 +110,10 @@
// Build the routing structure based on the loaded protobuf
// descriptor file and the config information.
type key struct {
- mthd string
- field string
+ method string
+ field string
}
- var msgs map[key]byte = make(map[key]byte)
+ var msgs = make(map[key]byte)
for _, f := range dr.protoDescriptor.File {
// Build a temporary map of message types by name.
for _, m := range f.MessageType {
@@ -153,7 +137,7 @@
// field number and save it for future reference.
log.Debugf("Processing method '%s'", *m.Name)
// Determine if this is a method we're supposed to be processing.
- if needMethod(*m.Name, config) == true {
+ if needMethod(*m.Name, config) {
log.Debugf("Enabling method '%s'", *m.Name)
pkg_methd := pkg_re.FindStringSubmatch(*m.InputType)
if pkg_methd == nil {
@@ -164,19 +148,19 @@
//in := (*m.InputType)[len(rconf.ProtoPackage)+2:]
in := pkg_methd[PKG_MTHD_MTHD]
dr.methodMap[*m.Name], ok = msgs[key{in, config.RouteField}]
- if ok == false {
+ if !ok {
log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
*m.Name, config.RouteField, in)
rtrn_err = true
}
}
// The sb method is always included in the methods so we can check it here too.
- if needSbMethod(*m.Name, config) == true {
+ if needSbMethod(*m.Name, config) {
log.Debugf("Enabling southbound method '%s'", *m.Name)
// The output type has the package name prepended to it. Remove it.
out := (*m.OutputType)[len(rconf.ProtoPackage)+2:]
- dr.nbBindingMthdMap[*m.Name], ok = msgs[key{out, config.RouteField}]
- if ok == false {
+ dr.nbBindingMethodMap[*m.Name], ok = msgs[key{out, config.RouteField}]
+ if !ok {
log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
*m.Name, config.RouteField, out)
rtrn_err = true
@@ -190,8 +174,8 @@
// Create the backend cluster or link to an existing one
ok := true
- if dr.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
- if dr.bkndClstr, err = newBackendCluster(config.backendCluster); err != nil {
+ if dr.cluster, ok = clusters[config.backendCluster.Name]; !ok {
+ if dr.cluster, err = newBackendCluster(config.backendCluster); err != nil {
log.Errorf("Could not create a backend for router %s", config.Name)
rtrn_err = true
}
@@ -222,37 +206,37 @@
return false
}
-func (r AffinityRouter) Service() string {
- return r.grpcService
+func (ar AffinityRouter) Service() string {
+ return ar.grpcService
}
-func (r AffinityRouter) Name() string {
- return r.name
+func (ar AffinityRouter) Name() string {
+ return ar.name
}
-func (r AffinityRouter) skipField(data *[]byte, idx *int) error {
+func (ar AffinityRouter) skipField(data *[]byte, idx *int) error {
switch (*data)[*idx] & 3 {
case 0: // Varint
- (*idx)++
+ *idx++
for (*data)[*idx] >= 128 {
- (*idx)++
+ *idx++
}
case 1: // 64 bit
- (*idx) += 9
+ *idx += 9
case 2: // Length delimited
- (*idx)++
+ *idx++
b := proto.NewBuffer((*data)[*idx:])
t, _ := b.DecodeVarint()
- (*idx) += int(t) + 1
+ *idx += int(t) + 1
case 3: // Deprecated
case 4: // Deprecated
case 5: // 32 bit
- (*idx) += 5
+ *idx += 5
}
return nil
}
-func (r AffinityRouter) decodeProtoField(payload []byte, fieldId byte) (string, error) {
+func (ar AffinityRouter) decodeProtoField(payload []byte, fieldId byte) (string, error) {
idx := 0
b := proto.NewBuffer([]byte{})
//b.DebugPrint("The Buffer", payload)
@@ -291,44 +275,44 @@
return "", err
}
return selector, nil
- } else if err := r.skipField(&payload, &idx); err != nil {
+ } else if err := ar.skipField(&payload, &idx); err != nil {
log.Errorf("Parsing message failed %v", err)
return "", err
}
}
}
-func (r AffinityRouter) Route(sel interface{}) *backend {
+func (ar AffinityRouter) Route(sel interface{}) *backend {
switch sl := sel.(type) {
case *nbFrame:
- log.Debugf("Route called for nbFrame with method %s", sl.mthdSlice[REQ_METHOD])
+ log.Debugf("Route called for nbFrame with method %s", sl.methodInfo.method)
// Check if this method should be affinity bound from the
// reply rather than the request.
- if _, ok := r.nbBindingMthdMap[sl.mthdSlice[REQ_METHOD]]; ok == true {
+ if _, ok := ar.nbBindingMethodMap[sl.methodInfo.method]; ok {
var err error
- log.Debugf("Method '%s' affinity binds on reply", sl.mthdSlice[REQ_METHOD])
+ log.Debugf("Method '%s' affinity binds on reply", sl.methodInfo.method)
// Just round robin route the southbound request
- if *r.curBknd, err = r.bkndClstr.nextBackend(*r.curBknd, BE_SEQ_RR); err == nil {
- return *r.curBknd
+ if *ar.currentBackend, err = ar.cluster.nextBackend(*ar.currentBackend, BackendSequenceRoundRobin); err == nil {
+ return *ar.currentBackend
} else {
sl.err = err
return nil
}
}
// Not a south affinity binding method, proceed with north affinity binding.
- if selector, err := r.decodeProtoField(sl.payload, r.methodMap[sl.mthdSlice[REQ_METHOD]]); err == nil {
+ if selector, err := ar.decodeProtoField(sl.payload, ar.methodMap[sl.methodInfo.method]); err == nil {
log.Debugf("Establishing affinity for selector: %s", selector)
- if rtrn, ok := r.affinity[selector]; ok {
+ if rtrn, ok := ar.affinity[selector]; ok {
return rtrn
} else {
// The selector isn't in the map, create a new affinity mapping
log.Debugf("MUST CREATE A NEW AFFINITY MAP ENTRY!!")
var err error
- if *r.curBknd, err = r.bkndClstr.nextBackend(*r.curBknd, BE_SEQ_RR); err == nil {
- r.setAffinity(selector, *r.curBknd)
- //r.affinity[selector] = *r.curBknd
- //log.Debugf("New affinity set to backend %s",(*r.curBknd).name)
- return *r.curBknd
+ if *ar.currentBackend, err = ar.cluster.nextBackend(*ar.currentBackend, BackendSequenceRoundRobin); err == nil {
+ ar.setAffinity(selector, *ar.currentBackend)
+ //ar.affinity[selector] = *ar.currentBackend
+ //log.Debugf("New affinity set to backend %s",(*ar.currentBackend).name)
+ return *ar.currentBackend
} else {
sl.err = err
return nil
@@ -339,7 +323,7 @@
log.Errorf("Internal: invalid data type in Route call %v", sel)
return nil
}
- log.Errorf("Bad lookup in affinity map %v", r.affinity)
+ log.Errorf("Bad lookup in affinity map %v", ar.affinity)
return nil
}
@@ -347,30 +331,30 @@
return "", "", nil
}
-func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*backendCluster, error) {
- return ar.bkndClstr, nil
+func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
+ return ar.cluster, nil
}
-func (ar AffinityRouter) FindBackendCluster(beName string) *backendCluster {
- if beName == ar.bkndClstr.name {
- return ar.bkndClstr
+func (ar AffinityRouter) FindBackendCluster(beName string) *cluster {
+ if beName == ar.cluster.name {
+ return ar.cluster
}
return nil
}
-func (r AffinityRouter) ReplyHandler(sel interface{}) error {
+func (ar AffinityRouter) ReplyHandler(sel interface{}) error {
switch sl := sel.(type) {
case *sbFrame:
- sl.lck.Lock()
- defer sl.lck.Unlock()
+ sl.mutex.Lock()
+ defer sl.mutex.Unlock()
log.Debugf("Reply handler called for sbFrame with method %s", sl.method)
// Determine if reply action is required.
- if fld, ok := r.nbBindingMthdMap[sl.method]; ok == true && len(sl.payload) > 0 {
+ if fld, ok := ar.nbBindingMethodMap[sl.method]; ok && len(sl.payload) > 0 {
// Extract the field value from the frame and
// and set affinity accordingly
- if selector, err := r.decodeProtoField(sl.payload, fld); err == nil {
+ if selector, err := ar.decodeProtoField(sl.payload, fld); err == nil {
log.Debug("Settign affinity on reply")
- if r.setAffinity(selector, sl.be) != nil {
+ if ar.setAffinity(selector, sl.backend) != nil {
log.Error("Setting affinity on reply failed")
}
return nil
@@ -389,7 +373,7 @@
}
func (ar AffinityRouter) setAffinity(key string, be *backend) error {
- if be2, ok := ar.affinity[key]; ok == false {
+ if be2, ok := ar.affinity[key]; !ok {
ar.affinity[key] = be
log.Debugf("New affinity set to backend %s for key %s", be.name, key)
} else if be2 != be {
diff --git a/afrouter/afrouter/api.go b/afrouter/afrouter/api.go
index 36e79a3..87f5573 100644
--- a/afrouter/afrouter/api.go
+++ b/afrouter/afrouter/api.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -46,7 +45,7 @@
log.Errorf("Invalid address '%s' provided for API server", config.Addr)
rtrn_err = true
}
- if rtrn_err == true {
+ if rtrn_err {
return nil, errors.New("Errors in API configuration")
} else {
var err error = nil
@@ -66,7 +65,7 @@
}
func (aa *ArouterApi) getServer(srvr string) (*server, error) {
- if s, ok := aa.ar.servers[srvr]; ok == false {
+ if s, ok := aa.ar.servers[srvr]; !ok {
err := errors.New(fmt.Sprintf("Server '%s' doesn't exist", srvr))
return nil, err
} else {
@@ -86,7 +85,7 @@
return nil, err
}
-func (aa *ArouterApi) getCluster(s *server, clstr string) (*backendCluster, error) {
+func (aa *ArouterApi) getCluster(s *server, clstr string) (*cluster, error) {
for _, pkg := range s.routers {
for _, r := range pkg {
if c := r.FindBackendCluster(clstr); c != nil {
@@ -98,7 +97,7 @@
return nil, err
}
-func (aa *ArouterApi) getBackend(c *backendCluster, bknd string) (*backend, error) {
+func (aa *ArouterApi) getBackend(c *cluster, bknd string) (*backend, error) {
for _, b := range c.backends {
if b.name == bknd {
return b, nil
@@ -109,8 +108,8 @@
return nil, err
}
-func (aa *ArouterApi) getConnection(b *backend, con string) (*beConnection, error) {
- if c, ok := b.connections[con]; ok == false {
+func (aa *ArouterApi) getConnection(b *backend, con string) (*connection, error) {
+ if c, ok := b.connections[con]; !ok {
err := errors.New(fmt.Sprintf("Connection '%s' doesn't exist", con))
return nil, err
} else {
@@ -118,14 +117,13 @@
}
}
-func (aa *ArouterApi) updateConnection(in *pb.Conn, cn *beConnection, b *backend) error {
+func (aa *ArouterApi) updateConnection(in *pb.Conn, cn *connection, b *backend) error {
sPort := strconv.FormatUint(in.Port, 10)
// Check that the ip address and or port are different
if in.Addr == cn.addr && sPort == cn.port {
err := errors.New(fmt.Sprintf("Refusing to change connection '%s' to identical values", in.Connection))
return err
}
- //log.Debugf("BEFORE: Be1: %v Be2 %v", cn.bknd, b)
cn.close()
cn.addr = in.Addr
cn.port = sPort
@@ -146,7 +144,7 @@
_ = aap
log.Debugf("Getting router %s and route %s", in.Router, in.Route)
- if r, ok := allRouters[in.Router+in.Route]; ok == true {
+ if r, ok := allRouters[in.Router+in.Route]; ok {
switch rr := r.(type) {
case AffinityRouter:
log.Debug("Affinity router found")
@@ -177,9 +175,9 @@
// not the same then close the existing connection. If they are bothe the same
// then return an error describing the situation.
var s *server
- var c *backendCluster
+ var c *cluster
var b *backend
- var cn *beConnection
+ var cn *connection
var err error
log.Debugf("SetConnection called! %v", in)
diff --git a/afrouter/afrouter/arproxy.go b/afrouter/afrouter/arproxy.go
index 71c7b1f..72641de 100644
--- a/afrouter/afrouter/arproxy.go
+++ b/afrouter/afrouter/arproxy.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -25,16 +24,7 @@
"github.com/opencord/voltha-go/common/log"
)
-type nbi int
-
-const (
- GRPC_NBI nbi = 1
- GRPC_STREAMING_NBI nbi = 2
- GRPC_CONTROL_NBI nbi = 3
-)
-
// String names for display in error messages.
-var arpxyNames = [...]string{"grpc_nbi", "grpc_streaming_nbi", "grpc_control_nbi"}
var arProxy *ArouterProxy = nil
type ArouterProxy struct {
diff --git a/afrouter/afrouter/backend.go b/afrouter/afrouter/backend.go
index 19133a3..f616f6b 100644
--- a/afrouter/afrouter/backend.go
+++ b/afrouter/afrouter/backend.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -30,224 +29,34 @@
"google.golang.org/grpc/metadata"
"io"
"net"
- "sort"
"strconv"
"strings"
"sync"
- "time"
)
-const (
- BE_ACTIVE_ACTIVE = 1 // Backend type active/active
- BE_SERVER = 2 // Backend type single server
- BE_SEQ_RR = 0 // Backend sequence round robin
- AS_NONE = 0 // Association strategy: none
- AS_SERIAL_NO = 1 // Association strategy: serial number
- AL_NONE = 0 // Association location: none
- AL_HEADER = 1 // Association location: header
- AL_PROTOBUF = 2 // Association location: protobuf
-)
-
-var beTypeNames = []string{"", "active_active", "server"}
-var asTypeNames = []string{"", "serial_number"}
-var alTypeNames = []string{"", "header", "protobuf"}
-
-var bClusters map[string]*backendCluster = make(map[string]*backendCluster)
-
-type backendCluster struct {
- name string
- //backends map[string]*backend
- backends []*backend
- beRvMap map[*backend]int
- serialNoSource chan uint64
-}
-
+// backend represents a collection of backends in a HA configuration
type backend struct {
- lck sync.Mutex
- name string
- beType int
- activeAssoc assoc
- connFailCallback func(string, *backend) bool
- connections map[string]*beConnection
- opnConns int
+ mutex sync.Mutex
+ name string
+ beType backendType
+ activeAssociation association
+ connFailCallback func(string, *backend) bool
+ connections map[string]*connection
+ openConns int
}
-type assoc struct {
- strategy int
- location int
+type association struct {
+ strategy associationStrategy
+ location associationLocation
field string // Used only if location is protobuf
key string
}
-type beConnection struct {
- lck sync.Mutex
- cncl context.CancelFunc
- name string
- addr string
- port string
- gConn *gConnection
- bknd *backend
-}
+func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f *nbFrame) (*streams, error) {
-// This structure should never be referred to
-// by any routine outside of *beConnection
-// routines.
-type gConnection struct {
- lck sync.Mutex
- state connectivity.State
- conn *grpc.ClientConn
- cncl context.CancelFunc
-}
+ rtrn := &streams{streams: make(map[string]*stream), activeStream: nil}
-type beClStrm struct {
- strm grpc.ClientStream
- ctxt context.Context
- cncl context.CancelFunc
- ok2Close chan struct{}
- c2sRtrn chan error
- s2cRtrn error
-}
-
-type beClStrms struct {
- lck sync.Mutex
- actvStrm *beClStrm
- strms map[string]*beClStrm
- srtdStrms []*beClStrm
-}
-
-//***************************************************************//
-//****************** BackendCluster Functions *******************//
-//***************************************************************//
-
-//TODO: Move the backend type (active/active etc) to the cluster
-// level. All backends should really be of the same type.
-// Create a new backend cluster
-func newBackendCluster(conf *BackendClusterConfig) (*backendCluster, error) {
- var err error = nil
- var rtrn_err bool = false
- var be *backend
- log.Debugf("Creating a backend cluster with %v", conf)
- // Validate the configuration
- if conf.Name == "" {
- log.Error("A backend cluster must have a name")
- rtrn_err = true
- }
- //bc := &backendCluster{name:conf.Name,backends:make(map[string]*backend)}
- bc := &backendCluster{name: conf.Name, beRvMap: make(map[*backend]int)}
- bClusters[bc.name] = bc
- bc.startSerialNumberSource() // Serial numberere for active/active backends
- idx := 0
- for _, bec := range conf.Backends {
- if bec.Name == "" {
- log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
- rtrn_err = true
- }
- if be, err = newBackend(&bec, conf.Name); err != nil {
- log.Errorf("Error creating backend %s", bec.Name)
- rtrn_err = true
- }
- bc.backends = append(bc.backends, be)
- bc.beRvMap[bc.backends[idx]] = idx
- idx++
- }
- if rtrn_err {
- return nil, errors.New("Error creating backend(s)")
- }
- return bc, nil
-}
-
-func (bc *backendCluster) getBackend(name string) *backend {
- for _, v := range bc.backends {
- if v.name == name {
- return v
- }
- }
- return nil
-}
-
-func (bc *backendCluster) startSerialNumberSource() {
- bc.serialNoSource = make(chan uint64)
- var counter uint64 = 0
- // This go routine only dies on exit, it is not a leak
- go func() {
- for {
- bc.serialNoSource <- counter
- counter++
- }
- }()
-}
-
-func (bc *backendCluster) nextBackend(be *backend, seq int) (*backend, error) {
- switch seq {
- case BE_SEQ_RR: // Round robin
- in := be
- // If no backend is found having a connection
- // then return nil.
- if be == nil {
- log.Debug("Previous backend is nil")
- be = bc.backends[0]
- in = be
- if be.opnConns != 0 {
- return be, nil
- }
- }
- for {
- log.Debugf("Requesting a new backend starting from %s", be.name)
- cur := bc.beRvMap[be]
- cur++
- if cur >= len(bc.backends) {
- cur = 0
- }
- log.Debugf("Next backend is %d:%s", cur, bc.backends[cur].name)
- if bc.backends[cur].opnConns > 0 {
- return bc.backends[cur], nil
- }
- if bc.backends[cur] == in {
- err := fmt.Errorf("No backend with open connections found")
- log.Debug(err)
- return nil, err
- }
- be = bc.backends[cur]
- log.Debugf("Backend '%s' has no open connections, trying next", bc.backends[cur].name)
- }
- default: // Invalid, defalt to routnd robin
- log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
- return bc.nextBackend(be, BE_SEQ_RR)
- }
-}
-
-func (bec *backendCluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, mthdSlice []string,
- mk string, mv string) error {
- //func (bec *backendCluster) handler(nbR * nbRequest) error {
-
- // The final backend cluster needs to be determined here. With non-affinity routed backends it could
- // just be determined here and for affinity routed backends the first message must be received
- // before the backend is determined. In order to keep things simple, the same approach is taken for
- // now.
-
- // Get the backend to use.
- // Allocate the nbFrame here since it holds the "context" of this communication
- nf := &nbFrame{router: r, mthdSlice: mthdSlice, serNo: bec.serialNoSource, metaKey: mk, metaVal: mv}
- log.Debugf("Nb frame allocate with method %s", nf.mthdSlice[REQ_METHOD])
-
- if be, err := bec.assignBackend(serverStream, nf); err != nil {
- // At this point, no backend streams have been initiated
- // so just return the error.
- return err
- } else {
- log.Debugf("Backend '%s' selected", be.name)
- // Allocate a sbFrame here because it might be needed for return value intercept
- sf := &sbFrame{router: r, be: be, method: nf.mthdSlice[REQ_METHOD], metaKey: mk, metaVal: mv}
- log.Debugf("Sb frame allocated with router %s", r.Name())
- return be.handler(srv, serverStream, nf, sf)
- }
-}
-
-func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, f *nbFrame) (*beClStrms, error) {
-
- rtrn := &beClStrms{strms: make(map[string]*beClStrm), actvStrm: nil}
-
- log.Debugf("Opening southbound streams for method '%s'", f.mthdSlice[REQ_METHOD])
+ log.Debugf("Opening southbound streams for method '%s'", f.methodInfo.method)
// Get the metadata from the incoming message on the server
md, ok := metadata.FromIncomingContext(serverStream.Context())
if !ok {
@@ -256,12 +65,11 @@
// TODO: Need to check if this is an active/active backend cluster
// with a serial number in the header.
- serialNo := <-f.serNo
- log.Debugf("Serial number for transaction allocated: %d", serialNo)
+ log.Debugf("Serial number for transaction allocated: %d", f.serialNo)
// If even one stream can be created then proceed. If none can be
// created then report an error becase both the primary and redundant
// connections are non-existant.
- var atLeastOne bool = false
+ var atLeastOne = false
var errStr strings.Builder
log.Debugf("There are %d connections to open", len(be.connections))
for _, cn := range be.connections {
@@ -274,18 +82,18 @@
clientCtx = metadata.NewOutgoingContext(clientCtx, md.Copy())
//TODO: Same check here, only add the serial number if necessary
clientCtx = metadata.AppendToOutgoingContext(clientCtx, "voltha_serial_number",
- strconv.FormatUint(serialNo, 10))
+ strconv.FormatUint(f.serialNo, 10))
// Create the client stream
if clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying,
- cn.getConn(), f.mthdSlice[REQ_ALL]); err != nil {
+ cn.getConn(), f.methodInfo.all); err != nil {
log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
fmt.Fprintf(&errStr, "{{Failed to create a client stream '%s', %v}} ", cn.name, err)
- rtrn.strms[cn.name] = nil
+ rtrn.streams[cn.name] = nil
} else {
- rtrn.strms[cn.name] = &beClStrm{strm: clientStream, ctxt: clientCtx,
- cncl: clientCancel, s2cRtrn: nil,
- ok2Close: make(chan struct{}),
- c2sRtrn: make(chan error, 1)}
+ rtrn.streams[cn.name] = &stream{stream: clientStream, ctx: clientCtx,
+ cancel: clientCancel, s2cReturn: nil,
+ ok2Close: make(chan struct{}),
+ c2sReturn: make(chan error, 1)}
atLeastOne = true
}
} else if cn.getConn() == nil {
@@ -298,7 +106,7 @@
log.Debug(err)
}
}
- if atLeastOne == true {
+ if atLeastOne {
rtrn.sortStreams()
return rtrn, nil
}
@@ -310,10 +118,10 @@
func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *nbFrame, sf *sbFrame) error {
// Set up and launch each individual southbound stream
- var beStrms *beClStrms
+ var beStrms *streams
var rtrn error = nil
- var s2cOk bool = false
- var c2sOk bool = false
+ var s2cOk = false
+ var c2sOk = false
beStrms, err := be.openSouthboundStreams(srv, serverStream, nf)
if err != nil {
@@ -343,7 +151,7 @@
// this is the successful case where the sender has encountered io.EOF, and won't be sending anymore./
// the clientStream>serverStream may continue sending though.
defer beStrms.closeSend()
- if c2sOk == true {
+ if c2sOk {
return rtrn
}
} else {
@@ -370,7 +178,7 @@
rtrn = c2sErr
}
log.Debug("c2sErr reporting EOF")
- if s2cOk == true {
+ if s2cOk {
return rtrn
}
}
@@ -378,258 +186,47 @@
return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}
-func (strms *beClStrms) clientCancel() {
- for _, strm := range strms.strms {
- if strm != nil {
- strm.cncl()
- }
- }
-}
-
-func (strms *beClStrms) closeSend() {
- for _, strm := range strms.strms {
- if strm != nil {
- <-strm.ok2Close
- log.Debug("Closing southbound stream")
- strm.strm.CloseSend()
- }
- }
-}
-
-func (strms *beClStrms) trailer() metadata.MD {
- return strms.actvStrm.strm.Trailer()
-}
-
-func (bec *backendCluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) {
- // Receive the first message from the server. This calls the assigned codec in which
- // Unmarshal gets executed. That will use the assigned router to select a backend
- // and add it to the frame
- if err := src.RecvMsg(f); err != nil {
- return nil, err
- }
- // Check that the backend was routable and actually has connections open.
- // If it doesn't then return a nil backend to indicate this
- if f.be == nil {
- err := fmt.Errorf("Unable to route method '%s'", f.mthdSlice[REQ_METHOD])
- log.Error(err)
- return nil, err
- } else if f.be.opnConns == 0 {
- err := fmt.Errorf("No open connections on backend '%s'", f.be.name)
- log.Error(err)
- return f.be, err
- }
- return f.be, nil
-}
-
-func (strms *beClStrms) getActive() *beClStrm {
- strms.lck.Lock()
- defer strms.lck.Unlock()
- return strms.actvStrm
-}
-
-func (strms *beClStrms) setThenGetActive(strm *beClStrm) *beClStrm {
- strms.lck.Lock()
- defer strms.lck.Unlock()
- if strms.actvStrm == nil {
- strms.actvStrm = strm
- }
- return strms.actvStrm
-}
-
-func (src *beClStrms) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
- fc2s := func(srcS *beClStrm) {
- for i := 0; ; i++ {
- if err := srcS.strm.RecvMsg(f); err != nil {
- if src.setThenGetActive(srcS) == srcS {
- srcS.c2sRtrn <- err // this can be io.EOF which is the success case
- } else {
- srcS.c2sRtrn <- nil // Inactive responder
- }
- close(srcS.ok2Close)
- break
- }
- if src.setThenGetActive(srcS) != srcS {
- srcS.c2sRtrn <- nil
- continue
- }
- if i == 0 {
- // This is a bit of a hack, but client to server headers are only readable after first client msg is
- // received but must be written to server stream before the first msg is flushed.
- // This is the only place to do it nicely.
- md, err := srcS.strm.Header()
- if err != nil {
- srcS.c2sRtrn <- err
- break
- }
- // Update the metadata for the response.
- if f.metaKey != NoMeta {
- if f.metaVal == "" {
- // We could also alsways just do this
- md.Set(f.metaKey, f.be.name)
- } else {
- md.Set(f.metaKey, f.metaVal)
- }
- }
- if err := dst.SendHeader(md); err != nil {
- srcS.c2sRtrn <- err
- break
- }
- }
- log.Debugf("Northbound frame %v", f.payload)
- if err := dst.SendMsg(f); err != nil {
- srcS.c2sRtrn <- err
- break
- }
- }
- }
-
- // There should be AT LEAST one open stream at this point
- // if there isn't its a grave error in the code and it will
- // cause this thread to block here so check for it and
- // don't let the lock up happen but report the error
- ret := make(chan error, 1)
- agg := make(chan *beClStrm)
- atLeastOne := false
- for _, strm := range src.strms {
- if strm != nil {
- go fc2s(strm)
- go func(s *beClStrm) { // Wait on result and aggregate
- r := <-s.c2sRtrn // got the return code
- if r == nil {
- return // We're the redundat stream, just die
- }
- s.c2sRtrn <- r // put it back to pass it along
- agg <- s // send the stream to the aggregator
- }(strm)
- atLeastOne = true
- }
- }
- if atLeastOne == true {
- go func() { // Wait on aggregated result
- s := <-agg
- ret <- <-s.c2sRtrn
- }()
- } else {
- err := errors.New("There are no open streams. Unable to forward message.")
- log.Error(err)
- ret <- err
- }
- return ret
-}
-
-func (strms *beClStrms) sendAll(f *nbFrame) error {
- var rtrn error
-
- atLeastOne := false
- for _, strm := range strms.srtdStrms {
- if strm != nil {
- if err := strm.strm.SendMsg(f); err != nil {
- log.Debugf("Error on SendMsg: %s", err.Error())
- strm.s2cRtrn = err
- }
- atLeastOne = true
- } else {
- log.Debugf("Nil stream")
- }
- }
- // If one of the streams succeeded, declare success
- // if none did pick an error and return it.
- if atLeastOne == true {
- for _, strm := range strms.srtdStrms {
- if strm != nil {
- rtrn = strm.s2cRtrn
- if rtrn == nil {
- return rtrn
- }
- }
- }
- return rtrn
- } else {
- rtrn = errors.New("There are no open streams, this should never happen")
- log.Error(rtrn)
- }
- return rtrn
-}
-
-func (dst *beClStrms) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
- ret := make(chan error, 1)
- go func() {
- // The frame buffer already has the results of a first
- // RecvMsg in it so the first thing to do is to
- // send it to the list of client streams and only
- // then read some more.
- for i := 0; ; i++ {
- // Send the message to each of the backend streams
- if err := dst.sendAll(f); err != nil {
- ret <- err
- log.Debugf("SendAll failed %s", err.Error())
- break
- }
- log.Debugf("Southbound frame %v", f.payload)
- if err := src.RecvMsg(f); err != nil {
- ret <- err // this can be io.EOF which is happy case
- break
- }
- }
- }()
- return ret
-}
-
-func (st *beClStrms) sortStreams() {
- var tmpKeys []string
- for k, _ := range st.strms {
- tmpKeys = append(tmpKeys, k)
- }
- sort.Strings(tmpKeys)
- for _, v := range tmpKeys {
- st.srtdStrms = append(st.srtdStrms, st.strms[v])
- }
-}
-
func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
var rtrn_err bool = false
log.Debugf("Configuring the backend with %v", *conf)
// Validate the conifg and configure the backend
- be := &backend{name: conf.Name, connections: make(map[string]*beConnection), opnConns: 0}
- idx := strIndex([]string(beTypeNames), conf.Type)
- if idx == 0 {
+ be := &backend{name: conf.Name, connections: make(map[string]*connection), openConns: 0}
+ if conf.Type == BackendUndefined {
log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
}
- be.beType = idx
+ be.beType = conf.Type
- idx = strIndex(asTypeNames, conf.Association.Strategy)
- if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
+ if conf.Association.Strategy == AssociationStrategyUndefined && be.beType == BackendActiveActive {
log.Errorf("An association strategy must be provided if the backend "+
"type is active/active for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
}
- be.activeAssoc.strategy = idx
+ be.activeAssociation.strategy = conf.Association.Strategy
- idx = strIndex(alTypeNames, conf.Association.Location)
- if idx == 0 && be.beType == BE_ACTIVE_ACTIVE {
+ if conf.Association.Location == AssociationLocationUndefined && be.beType == BackendActiveActive {
log.Errorf("An association location must be provided if the backend "+
"type is active/active for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
}
- be.activeAssoc.location = idx
+ be.activeAssociation.location = conf.Association.Location
- if conf.Association.Field == "" && be.activeAssoc.location == AL_PROTOBUF {
+ if conf.Association.Field == "" && be.activeAssociation.location == AssociationLocationProtobuf {
log.Errorf("An association field must be provided if the backend "+
"type is active/active and the location is set to protobuf "+
"for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
}
- be.activeAssoc.field = conf.Association.Field
+ be.activeAssociation.field = conf.Association.Field
- if conf.Association.Key == "" && be.activeAssoc.location == AL_HEADER {
+ if conf.Association.Key == "" && be.activeAssociation.location == AssociationLocationHeader {
log.Errorf("An association key must be provided if the backend "+
"type is active/active and the location is set to header "+
"for backend %s in cluster %s", conf.Name, clusterName)
rtrn_err = true
}
- be.activeAssoc.key = conf.Association.Key
+ be.activeAssociation.key = conf.Association.Key
if rtrn_err {
return nil, errors.New("Backend configuration failed")
}
@@ -637,7 +234,7 @@
// Connections can consist of just a name. This allows for dynamic configuration
// at a later time.
// TODO: validate that there is one connection for all but active/active backends
- if len(conf.Connections) > 1 && be.activeAssoc.strategy != BE_ACTIVE_ACTIVE {
+ if len(conf.Connections) > 1 && be.beType != BackendActiveActive {
log.Errorf("Only one connection must be specified if the association " +
"strategy is not set to 'active_active'")
rtrn_err = true
@@ -651,8 +248,8 @@
log.Errorf("A connection must have a name for backend %s in cluster %s",
conf.Name, clusterName)
} else {
- gc := &gConnection{conn: nil, cncl: nil, state: connectivity.Idle}
- be.connections[cnConf.Name] = &beConnection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, bknd: be, gConn: gc}
+ gc := &gConnection{conn: nil, cancel: nil, state: connectivity.Idle}
+ be.connections[cnConf.Name] = &connection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, backend: be, gConn: gc}
if cnConf.Addr != "" { // This connection will be specified later.
if ip := net.ParseIP(cnConf.Addr); ip == nil {
log.Errorf("The IP address for connection %s in backend %s in cluster %s is invalid",
@@ -684,23 +281,19 @@
return be, nil
}
-//***************************************************************//
-//********************* Backend Functions ***********************//
-//***************************************************************//
-
func (be *backend) incConn() {
- be.lck.Lock()
- defer be.lck.Unlock()
- be.opnConns++
+ be.mutex.Lock()
+ defer be.mutex.Unlock()
+ be.openConns++
}
func (be *backend) decConn() {
- be.lck.Lock()
- defer be.lck.Unlock()
- be.opnConns--
- if be.opnConns < 0 {
+ be.mutex.Lock()
+ defer be.mutex.Unlock()
+ be.openConns--
+ if be.openConns < 0 {
log.Error("Internal error, number of open connections less than 0")
- be.opnConns = 0
+ be.openConns = 0
}
}
@@ -714,207 +307,8 @@
}
}
-func (cn *beConnection) connect() {
- if cn.addr != "" && cn.getConn() == nil {
- log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
- // Dial doesn't block, it just returns and continues connecting in the background.
- // Check back later to confirm and increase the connection count.
- ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
- cn.setCncl(cnclFnc)
- if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
- log.Errorf("Dialng connection %v:%v", cn, err)
- cn.waitAndTryAgain(ctx)
- } else {
- cn.setConn(conn)
- log.Debugf("Starting the connection monitor for '%s'", cn.name)
- cn.monitor(ctx)
- }
- } else if cn.addr == "" {
- log.Infof("No address supplied for connection '%s', not connecting for now", cn.name)
- } else {
- log.Debugf("Connection '%s' is already connected, ignoring", cn.name)
- }
-}
-
-func (cn *beConnection) waitAndTryAgain(ctx context.Context) {
- go func(ctx context.Context) {
- ctxTm, cnclTm := context.WithTimeout(context.Background(), 10*time.Second)
- select {
- case <-ctxTm.Done():
- cnclTm()
- log.Debugf("Trying to connect '%s'", cn.name)
- // Connect creates a new context so cancel this one.
- cn.cancel()
- cn.connect()
- return
- case <-ctx.Done():
- cnclTm()
- return
- }
- }(ctx)
-}
-
-func (cn *beConnection) cancel() {
- cn.lck.Lock()
- defer cn.lck.Unlock()
- log.Debugf("Canceling connection %s", cn.name)
- if cn.gConn != nil {
- if cn.gConn.cncl != nil {
- cn.cncl()
- } else {
- log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
- }
- } else {
- log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name)
- }
-}
-
-func (cn *beConnection) setCncl(cncl context.CancelFunc) {
- cn.lck.Lock()
- defer cn.lck.Unlock()
- if cn.gConn != nil {
- cn.gConn.cncl = cncl
- } else {
- log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name)
- }
-}
-
-func (cn *beConnection) setConn(conn *grpc.ClientConn) {
- cn.lck.Lock()
- defer cn.lck.Unlock()
- if cn.gConn != nil {
- cn.gConn.conn = conn
- } else {
- log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name)
- }
-}
-
-func (cn *beConnection) getConn() *grpc.ClientConn {
- cn.lck.Lock()
- defer cn.lck.Unlock()
- if cn.gConn != nil {
- return cn.gConn.conn
- }
- return nil
-}
-
-func (cn *beConnection) close() {
- cn.lck.Lock()
- defer cn.lck.Unlock()
- log.Debugf("Closing connection %s", cn.name)
- if cn.gConn != nil && cn.gConn.conn != nil {
- if cn.gConn.conn.GetState() == connectivity.Ready {
- cn.bknd.decConn() // Decrease the connection reference
- }
- if cn.gConn.cncl != nil {
- cn.gConn.cncl() // Cancel the context first to force monitor functions to exit
- } else {
- log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
- }
- cn.gConn.conn.Close() // Close the connection
- // Now replace the gConn object with a new one as this one just
- // fades away as references to it are released after the close
- // finishes in the background.
- cn.gConn = &gConnection{conn: nil, cncl: nil, state: connectivity.TransientFailure}
- } else {
- log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
- }
-
-}
-
-func (cn *beConnection) setState(st connectivity.State) {
- cn.lck.Lock()
- defer cn.lck.Unlock()
- if cn.gConn != nil {
- cn.gConn.state = st
- } else {
- log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name)
- }
-}
-
-func (cn *beConnection) getState() connectivity.State {
- cn.lck.Lock()
- defer cn.lck.Unlock()
- if cn.gConn != nil {
- if cn.gConn.conn != nil {
- return cn.gConn.conn.GetState()
- } else {
- log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name)
- }
- } else {
- log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name)
- }
- // For lack of a better state to use. The logs will help determine what happened here.
- return connectivity.TransientFailure
-}
-
-func (cn *beConnection) monitor(ctx context.Context) {
- bp := cn.bknd
- log.Debugf("Setting up monitoring for backend %s", bp.name)
- go func(ctx context.Context) {
- var delay time.Duration = 100 //ms
- for {
- //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, bp.name, cn.conn)
- if cn.getState() == connectivity.Ready {
- log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, bp.name)
- cn.setState(connectivity.Ready)
- bp.incConn()
- if cn.getConn() != nil && cn.getConn().WaitForStateChange(ctx, connectivity.Ready) == false {
- // The context was canceled. This is done by the close function
- // so just exit the routine
- log.Debugf("Contxt canceled for connection '%s' on backend '%s'", cn.name, bp.name)
- return
- }
- if cs := cn.getConn(); cs != nil {
- switch cs := cn.getState(); cs {
- case connectivity.TransientFailure:
- cn.setState(cs)
- bp.decConn()
- log.Infof("Transient failure for connection '%s' on backend '%s'", cn.name, bp.name)
- delay = 100
- case connectivity.Shutdown:
- //The connection was closed. The assumption here is that the closer
- // will manage the connection count and setting the conn to nil.
- // Exit the routine
- log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, bp.name)
- return
- case connectivity.Idle:
- // This can only happen if the server sends a GoAway. This can
- // only happen if the server has modified MaxConnectionIdle from
- // its default of infinity. The only solution here is to close the
- // connection and keepTrying()?
- //TODO: Read the grpc source code to see if there's a different approach
- log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, bp.name)
- cn.close()
- cn.connect()
- return
- }
- } else { // A nil means something went horribly wrong, error and exit.
- log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s", cn.name)
- return
- }
- } else {
- log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, bp.name)
- ctxTm, cnclTm := context.WithTimeout(context.Background(), delay*time.Millisecond)
- if delay < 30000 {
- delay += delay
- }
- select {
- case <-ctxTm.Done():
- cnclTm() // Doubt this is required but it's harmless.
- // Do nothing but let the loop continue
- case <-ctx.Done():
- // Context was closed, close and exit routine
- //cn.close() NO! let the close be managed externally!
- return
- }
- }
- }
- }(ctx)
-}
-
// Set a callback for connection failure notification
// This is currently not used.
-func (bp *backend) setConnFailCallback(cb func(string, *backend) bool) {
- bp.connFailCallback = cb
+func (be *backend) setConnFailCallback(cb func(string, *backend) bool) {
+ be.connFailCallback = cb
}
diff --git a/afrouter/afrouter/binding-router.go b/afrouter/afrouter/binding-router.go
index 7de3ea7..67a7a7e 100644
--- a/afrouter/afrouter/binding-router.go
+++ b/afrouter/afrouter/binding-router.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -27,22 +26,21 @@
type BindingRouter struct {
name string
- routerType int // TODO: This is probably not needed
- association int
+ association associationType
//routingField string
grpcService string
//protoDescriptor *pb.FileDescriptorSet
//methodMap map[string]byte
- bkndClstr *backendCluster
- bindings map[string]*backend
- bindingType string
- bindingField string
- bindingMethod string
- curBknd **backend
+ beCluster *cluster
+ bindings map[string]*backend
+ bindingType string
+ bindingField string
+ bindingMethod string
+ currentBackend **backend
}
-func (br BindingRouter) BackendCluster(s string, metaKey string) (*backendCluster, error) {
- return br.bkndClstr, nil
+func (br BindingRouter) BackendCluster(s string, metaKey string) (*cluster, error) {
+ return br.beCluster, nil
//return nil,errors.New("Not implemented yet")
}
func (br BindingRouter) Name() string {
@@ -52,8 +50,8 @@
return br.grpcService
}
func (br BindingRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
- var rtrnK string = ""
- var rtrnV string = ""
+ var rtrnK = ""
+ var rtrnV = ""
// Get the metadata from the server stream
md, ok := metadata.FromIncomingContext(serverStream.Context())
@@ -69,9 +67,9 @@
return rtrnK, rtrnV, nil
}
-func (br BindingRouter) FindBackendCluster(becName string) *backendCluster {
- if becName == br.bkndClstr.name {
- return br.bkndClstr
+func (br BindingRouter) FindBackendCluster(becName string) *cluster {
+ if becName == br.beCluster.name {
+ return br.beCluster
}
return nil
}
@@ -92,19 +90,19 @@
sl.err = err
return nil
}
- if sl.mthdSlice[REQ_METHOD] != br.bindingMethod {
+ if sl.methodInfo.method != br.bindingMethod {
err = errors.New(fmt.Sprintf("Binding must occur with method %s but attempted with method %s",
- br.bindingMethod, sl.mthdSlice[REQ_METHOD]))
+ br.bindingMethod, sl.methodInfo.method))
log.Error(err)
sl.err = err
return nil
}
log.Debugf("MUST CREATE A NEW BINDING MAP ENTRY!!")
- if len(br.bindings) < len(br.bkndClstr.backends) {
- if *br.curBknd, err = br.bkndClstr.nextBackend(*br.curBknd, BE_SEQ_RR); err == nil {
+ if len(br.bindings) < len(br.beCluster.backends) {
+ if *br.currentBackend, err = br.beCluster.nextBackend(*br.currentBackend, BackendSequenceRoundRobin); err == nil {
// Use the name of the backend as the metaVal for this new binding
- br.bindings[(*br.curBknd).name] = *br.curBknd
- return *br.curBknd
+ br.bindings[(*br.currentBackend).name] = *br.currentBackend
+ return *br.currentBackend
} else {
log.Error(err)
sl.err = err
@@ -124,7 +122,7 @@
}
func newBindingRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
- var rtrn_err bool = false
+ var rtrn_err = false
var err error = nil
log.Debugf("Creating binding router %s", config.Name)
// A name must exist
@@ -165,18 +163,14 @@
grpcService: rconf.ProtoService,
bindings: make(map[string]*backend),
//methodMap:make(map[string]byte),
- curBknd: &bptr,
+ currentBackend: &bptr,
//serialNo:0,
}
// A binding association must exist
- br.association = strIndex(rAssnNames, config.Binding.Association)
- if br.association == 0 {
- if config.Binding.Association == "" {
- log.Error("An binding association must be specified")
- } else {
- log.Errorf("The binding association '%s' is not valid", config.Binding.Association)
- }
+ br.association = config.Binding.Association
+ if br.association == AssociationUndefined {
+ log.Error("An binding association must be specified")
rtrn_err = true
}
// A binding type must exist
@@ -202,19 +196,10 @@
br.bindingField = config.Binding.Field
}
- // This has already been validated bfore this function
- // is called so just use it.
- for idx := range rTypeNames {
- if config.Type == rTypeNames[idx] {
- br.routerType = idx
- break
- }
- }
-
// Create the backend cluster or link to an existing one
ok := true
- if br.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
- if br.bkndClstr, err = newBackendCluster(config.backendCluster); err != nil {
+ if br.beCluster, ok = clusters[config.backendCluster.Name]; ok == false {
+ if br.beCluster, err = newBackendCluster(config.backendCluster); err != nil {
log.Errorf("Could not create a backend for router %s", config.Name)
rtrn_err = true
}
diff --git a/afrouter/afrouter/cluster.go b/afrouter/afrouter/cluster.go
new file mode 100644
index 0000000..879246b
--- /dev/null
+++ b/afrouter/afrouter/cluster.go
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "sync/atomic"
+)
+
+var clusters = make(map[string]*cluster)
+
+// cluster a collection of HA backends
+type cluster struct {
+ name string
+ //backends map[string]*backend
+ backends []*backend
+ backendIDMap map[*backend]int
+ serialNoCounter uint64
+}
+
+//TODO: Move the backend type (active/active etc) to the cluster
+// level. All backends should really be of the same type.
+// Create a new backend cluster
+func newBackendCluster(conf *BackendClusterConfig) (*cluster, error) {
+ var err error = nil
+ var rtrn_err = false
+ var be *backend
+ log.Debugf("Creating a backend cluster with %v", conf)
+ // Validate the configuration
+ if conf.Name == "" {
+ log.Error("A backend cluster must have a name")
+ rtrn_err = true
+ }
+ //bc := &cluster{name:conf.Name,backends:make(map[string]*backend)}
+ bc := &cluster{name: conf.Name, backendIDMap: make(map[*backend]int)}
+ clusters[bc.name] = bc
+ idx := 0
+ for _, bec := range conf.Backends {
+ if bec.Name == "" {
+ log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
+ rtrn_err = true
+ }
+ if be, err = newBackend(&bec, conf.Name); err != nil {
+ log.Errorf("Error creating backend %s", bec.Name)
+ rtrn_err = true
+ }
+ bc.backends = append(bc.backends, be)
+ bc.backendIDMap[bc.backends[idx]] = idx
+ idx++
+ }
+ if rtrn_err {
+ return nil, errors.New("Error creating backend(s)")
+ }
+ return bc, nil
+}
+
+func (c *cluster) getBackend(name string) *backend {
+ for _, v := range c.backends {
+ if v.name == name {
+ return v
+ }
+ }
+ return nil
+}
+
+func (c *cluster) allocateSerialNumber() uint64 {
+ return atomic.AddUint64(&c.serialNoCounter, 1) - 1
+}
+
+func (c *cluster) nextBackend(be *backend, seq backendSequence) (*backend, error) {
+ switch seq {
+ case BackendSequenceRoundRobin: // Round robin
+ in := be
+ // If no backend is found having a connection
+ // then return nil.
+ if be == nil {
+ log.Debug("Previous backend is nil")
+ be = c.backends[0]
+ in = be
+ if be.openConns != 0 {
+ return be, nil
+ }
+ }
+ for {
+ log.Debugf("Requesting a new backend starting from %s", be.name)
+ cur := c.backendIDMap[be]
+ cur++
+ if cur >= len(c.backends) {
+ cur = 0
+ }
+ log.Debugf("Next backend is %d:%s", cur, c.backends[cur].name)
+ if c.backends[cur].openConns > 0 {
+ return c.backends[cur], nil
+ }
+ if c.backends[cur] == in {
+ err := fmt.Errorf("No backend with open connections found")
+ log.Debug(err)
+ return nil, err
+ }
+ be = c.backends[cur]
+ log.Debugf("Backend '%s' has no open connections, trying next", c.backends[cur].name)
+ }
+ default: // Invalid, default to round robin
+ log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
+ return c.nextBackend(be, BackendSequenceRoundRobin)
+ }
+}
+
+func (c *cluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, methodInfo methodDetails,
+ mk string, mv string) error {
+ //func (c *cluster) handler(nbR * nbRequest) error {
+
+ // The final backend cluster needs to be determined here. With non-affinity routed backends it could
+ // just be determined here and for affinity routed backends the first message must be received
+ // before the backend is determined. In order to keep things simple, the same approach is taken for
+ // now.
+
+ // Get the backend to use.
+ // Allocate the nbFrame here since it holds the "context" of this communication
+ nf := &nbFrame{router: r, methodInfo: methodInfo, serialNo: c.allocateSerialNumber(), metaKey: mk, metaVal: mv}
+ log.Debugf("Nb frame allocate with method %s", nf.methodInfo.method)
+
+ if be, err := c.assignBackend(serverStream, nf); err != nil {
+ // At this point, no backend streams have been initiated
+ // so just return the error.
+ return err
+ } else {
+ log.Debugf("Backend '%s' selected", be.name)
+ // Allocate a sbFrame here because it might be needed for return value intercept
+ sf := &sbFrame{router: r, backend: be, method: nf.methodInfo.method, metaKey: mk, metaVal: mv}
+ log.Debugf("Sb frame allocated with router %s", r.Name())
+ return be.handler(srv, serverStream, nf, sf)
+ }
+}
+
+func (c *cluster) assignBackend(src grpc.ServerStream, f *nbFrame) (*backend, error) {
+ // Receive the first message from the server. This calls the assigned codec in which
+ // Unmarshal gets executed. That will use the assigned router to select a backend
+ // and add it to the frame
+ if err := src.RecvMsg(f); err != nil {
+ return nil, err
+ }
+ // Check that the backend was routable and actually has connections open.
+ // If it doesn't then return a nil backend to indicate this
+ if f.backend == nil {
+ err := fmt.Errorf("Unable to route method '%s'", f.methodInfo.method)
+ log.Error(err)
+ return nil, err
+ } else if f.backend.openConns == 0 {
+ err := fmt.Errorf("No open connections on backend '%s'", f.backend.name)
+ log.Error(err)
+ return f.backend, err
+ }
+ return f.backend, nil
+}
diff --git a/afrouter/afrouter/codec.go b/afrouter/afrouter/codec.go
index 7147916..675320a 100644
--- a/afrouter/afrouter/codec.go
+++ b/afrouter/afrouter/codec.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -37,25 +36,27 @@
parentCodec grpc.Codec
}
+// sbFrame is a frame being "returned" to whomever established the connection
type sbFrame struct {
payload []byte
router Router
method string
- be *backend
- lck sync.Mutex
+ backend *backend
+ mutex sync.Mutex
metaKey string
metaVal string
}
+// nbFrame is a frame coming in from whomever established the connection
type nbFrame struct {
- payload []byte
- router Router
- be *backend
- err error
- mthdSlice []string
- serNo chan uint64
- metaKey string
- metaVal string
+ payload []byte
+ router Router
+ backend *backend
+ err error
+ methodInfo methodDetails
+ serialNo uint64
+ metaKey string
+ metaVal string
}
func (cdc *transparentRoutingCodec) Marshal(v interface{}) ([]byte, error) {
@@ -67,7 +68,6 @@
default:
return cdc.parentCodec.Marshal(v)
}
-
}
func (cdc *transparentRoutingCodec) Unmarshal(data []byte, v interface{}) error {
@@ -81,8 +81,8 @@
t.payload = data
// This is were the afinity value is pulled from the payload
// and the backend selected.
- t.be = t.router.Route(v)
- log.Debugf("Routing returned %v for method %s", t.be, t.mthdSlice[REQ_METHOD])
+ t.backend = t.router.Route(v)
+ log.Debugf("Routing returned %v for method %s", t.backend, t.methodInfo.method)
return nil
default:
return cdc.parentCodec.Unmarshal(data, v)
diff --git a/afrouter/afrouter/config.go b/afrouter/afrouter/config.go
index 2cc2976..044fdd7 100644
--- a/afrouter/afrouter/config.go
+++ b/afrouter/afrouter/config.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -69,9 +68,9 @@
type RouteConfig struct {
Name string `json:"name"`
- Type string `json:"type"`
+ Type routeType `json:"type"`
ProtoFile string `json:"proto_descriptor"`
- Association string `json:"association"`
+ Association associationType `json:"association"`
RouteField string `json:"routing_field"`
Methods []string `json:"methods"` // The GRPC methods to route using the route field
NbBindingMethods []string `json:"nb_binding_methods"`
@@ -82,10 +81,10 @@
}
type BindingConfig struct {
- Type string `json:"type"`
- Field string `json:"field"`
- Method string `json:"method"`
- Association string `json:"association"`
+ Type string `json:"type"`
+ Field string `json:"field"`
+ Method string `json:"method"`
+ Association associationType `json:"association"`
}
type OverrideConfig struct {
@@ -103,16 +102,16 @@
type BackendConfig struct {
Name string `json:"name"`
- Type string `json:"type"`
+ Type backendType `json:"type"`
Association AssociationConfig `json:"association"`
Connections []ConnectionConfig `json:"connections"`
}
type AssociationConfig struct {
- Strategy string `json:"strategy"`
- Location string `json:"location"`
- Field string `json:"field"`
- Key string `json:"key"`
+ Strategy associationStrategy `json:"strategy"`
+ Location associationLocation `json:"location"`
+ Field string `json:"field"`
+ Key string `json:"key"`
}
type ConnectionConfig struct {
@@ -195,14 +194,14 @@
// Resolve router references for the servers
log.Debug("Resolving references in the config file")
- for k, _ := range conf.Servers {
+ for k := range conf.Servers {
//s.routers =make(map[string]*RouterConfig)
conf.Servers[k].routers = make(map[string]*RouterConfig)
for _, rPkg := range conf.Servers[k].Routers {
- var found bool = false
+ var found = false
// Locate the router "r" in the top lever Routers array
log.Debugf("Resolving router reference to router '%s' from server '%s'", rPkg.Router, conf.Servers[k].Name)
- for rk, _ := range conf.Routers {
+ for rk := range conf.Routers {
if conf.Routers[rk].Name == rPkg.Router && !found {
log.Debugf("Reference to router '%s' found for package '%s'", rPkg.Router, rPkg.Package)
conf.Servers[k].routers[rPkg.Package] = &conf.Routers[rk]
@@ -229,7 +228,7 @@
// Resolve backend references for the routers
for rk, rv := range conf.Routers {
for rtk, rtv := range rv.Routes {
- var found bool = false
+ var found = false
log.Debugf("Resolving backend reference to %s from router %s", rtv.BackendCluster, rv.Name)
for bek, bev := range conf.BackendClusters {
log.Debugf("Checking cluster %s", conf.BackendClusters[bek].Name)
diff --git a/afrouter/afrouter/connection.go b/afrouter/afrouter/connection.go
new file mode 100644
index 0000000..fab1052
--- /dev/null
+++ b/afrouter/afrouter/connection.go
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "context"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/connectivity"
+ "sync"
+ "time"
+)
+
+// connection represents a connection to a single backend
+type connection struct {
+ mutex sync.Mutex
+ name string
+ addr string
+ port string
+ gConn *gConnection
+ backend *backend
+}
+
+// This structure should never be referred to
+// by any routine outside of *connection
+// routines.
+type gConnection struct {
+ mutex sync.Mutex
+ state connectivity.State
+ conn *grpc.ClientConn
+ cancel context.CancelFunc
+}
+
+func (cn *connection) connect() {
+ if cn.addr != "" && cn.getConn() == nil {
+ log.Infof("Connecting to connection %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
+ // Dial doesn't block, it just returns and continues connecting in the background.
+ // Check back later to confirm and increase the connection count.
+ ctx, cnclFnc := context.WithCancel(context.Background()) // Context for canceling the connection
+ cn.setCancel(cnclFnc)
+ if conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure()); err != nil {
+ log.Errorf("Dialng connection %v:%v", cn, err)
+ cn.waitAndTryAgain(ctx)
+ } else {
+ cn.setConn(conn)
+ log.Debugf("Starting the connection monitor for '%s'", cn.name)
+ cn.monitor(ctx)
+ }
+ } else if cn.addr == "" {
+ log.Infof("No address supplied for connection '%s', not connecting for now", cn.name)
+ } else {
+ log.Debugf("Connection '%s' is already connected, ignoring", cn.name)
+ }
+}
+
+func (cn *connection) waitAndTryAgain(ctx context.Context) {
+ go func(ctx context.Context) {
+ ctxTm, cnclTm := context.WithTimeout(context.Background(), 10*time.Second)
+ select {
+ case <-ctxTm.Done():
+ cnclTm()
+ log.Debugf("Trying to connect '%s'", cn.name)
+ // Connect creates a new context so cancel this one.
+ cn.cancel()
+ cn.connect()
+ return
+ case <-ctx.Done():
+ cnclTm()
+ return
+ }
+ }(ctx)
+}
+
+func (cn *connection) cancel() {
+ cn.mutex.Lock()
+ defer cn.mutex.Unlock()
+ log.Debugf("Canceling connection %s", cn.name)
+ if cn.gConn != nil {
+ if cn.gConn.cancel != nil {
+ cn.gConn.cancel()
+ } else {
+ log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
+ }
+ } else {
+ log.Errorf("Internal error, attempting to cancel on a nil connection object: '%s'", cn.name)
+ }
+}
+
+func (cn *connection) setCancel(cancel context.CancelFunc) {
+ cn.mutex.Lock()
+ defer cn.mutex.Unlock()
+ if cn.gConn != nil {
+ cn.gConn.cancel = cancel
+ } else {
+ log.Errorf("Internal error, attempting to set a cancel function on a nil connection object: '%s'", cn.name)
+ }
+}
+
+func (cn *connection) setConn(conn *grpc.ClientConn) {
+ cn.mutex.Lock()
+ defer cn.mutex.Unlock()
+ if cn.gConn != nil {
+ cn.gConn.conn = conn
+ } else {
+ log.Errorf("Internal error, attempting to set a connection on a nil connection object: '%s'", cn.name)
+ }
+}
+
+func (cn *connection) getConn() *grpc.ClientConn {
+ cn.mutex.Lock()
+ defer cn.mutex.Unlock()
+ if cn.gConn != nil {
+ return cn.gConn.conn
+ }
+ return nil
+}
+
+func (cn *connection) close() {
+ cn.mutex.Lock()
+ defer cn.mutex.Unlock()
+ log.Debugf("Closing connection %s", cn.name)
+ if cn.gConn != nil && cn.gConn.conn != nil {
+ if cn.gConn.conn.GetState() == connectivity.Ready {
+ cn.backend.decConn() // Decrease the connection reference
+ }
+ if cn.gConn.cancel != nil {
+ cn.gConn.cancel() // Cancel the context first to force monitor functions to exit
+ } else {
+ log.Errorf("Internal error, attempt to cancel a nil context for connection '%s'", cn.name)
+ }
+ cn.gConn.conn.Close() // Close the connection
+ // Now replace the gConn object with a new one as this one just
+ // fades away as references to it are released after the close
+ // finishes in the background.
+ cn.gConn = &gConnection{conn: nil, cancel: nil, state: connectivity.TransientFailure}
+ } else {
+ log.Errorf("Internal error, attempt to close a nil connection object for '%s'", cn.name)
+ }
+
+}
+
+func (cn *connection) setState(st connectivity.State) {
+ cn.mutex.Lock()
+ defer cn.mutex.Unlock()
+ if cn.gConn != nil {
+ cn.gConn.state = st
+ } else {
+ log.Errorf("Internal error, attempting to set connection state on a nil connection object: '%s'", cn.name)
+ }
+}
+
+func (cn *connection) getState() connectivity.State {
+ cn.mutex.Lock()
+ defer cn.mutex.Unlock()
+ if cn.gConn != nil {
+ if cn.gConn.conn != nil {
+ return cn.gConn.conn.GetState()
+ } else {
+ log.Errorf("Internal error, attempting to get connection state on a nil connection: '%s'", cn.name)
+ }
+ } else {
+ log.Errorf("Internal error, attempting to get connection state on a nil connection object: '%s'", cn.name)
+ }
+ // For lack of a better state to use. The logs will help determine what happened here.
+ return connectivity.TransientFailure
+}
+
+func (cn *connection) monitor(ctx context.Context) {
+ be := cn.backend
+ log.Debugf("Setting up monitoring for backend %s", be.name)
+ go func(ctx context.Context) {
+ var delay time.Duration = 100 //ms
+ for {
+ //log.Debugf("****** Monitoring connection '%s' on backend '%s', %v", cn.name, be.name, cn.conn)
+ if cn.getState() == connectivity.Ready {
+ log.Debugf("connection '%s' on backend '%s' becomes ready", cn.name, be.name)
+ cn.setState(connectivity.Ready)
+ be.incConn()
+ if cn.getConn() != nil && !cn.getConn().WaitForStateChange(ctx, connectivity.Ready) {
+ // The context was canceled. This is done by the close function
+ // so just exit the routine
+ log.Debugf("Contxt canceled for connection '%s' on backend '%s'", cn.name, be.name)
+ return
+ }
+ if cs := cn.getConn(); cs != nil {
+ switch cs := cn.getState(); cs {
+ case connectivity.TransientFailure:
+ cn.setState(cs)
+ be.decConn()
+ log.Infof("Transient failure for connection '%s' on backend '%s'", cn.name, be.name)
+ delay = 100
+ case connectivity.Shutdown:
+ //The connection was closed. The assumption here is that the closer
+ // will manage the connection count and setting the conn to nil.
+ // Exit the routine
+ log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, be.name)
+ return
+ case connectivity.Idle:
+ // This can only happen if the server sends a GoAway. This can
+ // only happen if the server has modified MaxConnectionIdle from
+ // its default of infinity. The only solution here is to close the
+ // connection and keepTrying()?
+ //TODO: Read the grpc source code to see if there's a different approach
+ log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, be.name)
+ cn.close()
+ cn.connect()
+ return
+ }
+ } else { // A nil means something went horribly wrong, error and exit.
+ log.Errorf("Somthing horrible happned, the connection is nil and shouldn't be for connection %s", cn.name)
+ return
+ }
+ } else {
+ log.Debugf("Waiting for connection '%s' on backend '%s' to become ready", cn.name, be.name)
+ ctxTm, cnclTm := context.WithTimeout(context.Background(), delay*time.Millisecond)
+ if delay < 30000 {
+ delay += delay
+ }
+ select {
+ case <-ctxTm.Done():
+ cnclTm() // Doubt this is required but it's harmless.
+ // Do nothing but let the loop continue
+ case <-ctx.Done():
+ cnclTm()
+ // Context was closed, close and exit routine
+ //cn.close() NO! let the close be managed externally!
+ return
+ }
+ }
+ }
+ }(ctx)
+}
diff --git a/afrouter/afrouter/enums.go b/afrouter/afrouter/enums.go
new file mode 100644
index 0000000..02b1d3d
--- /dev/null
+++ b/afrouter/afrouter/enums.go
@@ -0,0 +1,201 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "encoding/json"
+ "fmt"
+)
+
+type backendType int
+
+const (
+ BackendUndefined backendType = iota
+ BackendActiveActive
+ BackendSingleServer
+)
+
+var stringToBeType = map[string]backendType{"active_active": BackendActiveActive, "server": BackendSingleServer}
+var beTypeToString = map[backendType]string{BackendActiveActive: "active_active", BackendSingleServer: "server"}
+
+func (t backendType) MarshalJSON() ([]byte, error) {
+ if t == BackendUndefined {
+ return json.Marshal(nil)
+ }
+ if str, have := beTypeToString[t]; have {
+ return json.Marshal(str)
+ }
+ return nil, fmt.Errorf("unknown %T '%d'", t, t)
+}
+
+func (t *backendType) UnmarshalJSON(b []byte) error {
+ var str string
+ if err := json.Unmarshal(b, &str); err != nil {
+ return err
+ }
+ var have bool
+ if *t, have = stringToBeType[str]; !have {
+ return fmt.Errorf("invalid %T %s", *t, str)
+ }
+ return nil
+}
+
+type associationLocation int
+
+const (
+ AssociationLocationUndefined associationLocation = iota
+ AssociationLocationHeader
+ AssociationLocationProtobuf
+)
+
+var stringToAlType = map[string]associationLocation{"header": AssociationLocationHeader, "protobuf": AssociationLocationProtobuf}
+var alTypeToString = map[associationLocation]string{AssociationLocationHeader: "header", AssociationLocationProtobuf: "protobuf"}
+
+func (t associationLocation) MarshalJSON() ([]byte, error) {
+ if t == AssociationLocationUndefined {
+ return json.Marshal(nil)
+ }
+ if str, have := alTypeToString[t]; have {
+ return json.Marshal(str)
+ }
+ return nil, fmt.Errorf("unknown %T '%d'", t, t)
+}
+
+func (t *associationLocation) UnmarshalJSON(b []byte) error {
+ var str string
+ if err := json.Unmarshal(b, &str); err != nil {
+ return err
+ }
+ var have bool
+ if *t, have = stringToAlType[str]; !have {
+ return fmt.Errorf("invalid %T %s", *t, str)
+ }
+ return nil
+}
+
+type associationStrategy int
+
+const (
+ AssociationStrategyUndefined associationStrategy = iota
+ AssociationStrategySerialNo
+)
+
+var stringToAsType = map[string]associationStrategy{"serial_number": AssociationStrategySerialNo}
+var asTypeToString = map[associationStrategy]string{AssociationStrategySerialNo: "serial_number"}
+
+func (t associationStrategy) MarshalJSON() ([]byte, error) {
+ if t == AssociationStrategyUndefined {
+ return json.Marshal(nil)
+ }
+ if str, have := asTypeToString[t]; have {
+ return json.Marshal(str)
+ }
+ return nil, fmt.Errorf("unknown %T '%d'", t, t)
+}
+
+func (t *associationStrategy) UnmarshalJSON(b []byte) error {
+ var str string
+ if err := json.Unmarshal(b, &str); err != nil {
+ return err
+ }
+ var have bool
+ if *t, have = stringToAsType[str]; !have {
+ return fmt.Errorf("invalid %T %s", *t, str)
+ }
+ return nil
+}
+
+type backendSequence int
+
+const (
+ BackendSequenceRoundRobin backendSequence = iota
+)
+
+type routeType int
+
+const (
+ RouteTypeUndefined routeType = iota
+ RouteTypeRpcAffinityMessage
+ RouteTypeRpcAffinityHeader
+ RouteTypeBinding
+ RouteTypeRoundRobin
+)
+
+// String names for display in error messages.
+var stringToRouteType = map[string]routeType{"rpc_affinity_message": RouteTypeRpcAffinityMessage, "rpc_affinity_header": RouteTypeRpcAffinityHeader, "binding": RouteTypeBinding, "round_robin": RouteTypeRoundRobin}
+var routeTypeToString = map[routeType]string{RouteTypeRpcAffinityMessage: "rpc_affinity_message", RouteTypeRpcAffinityHeader: "rpc_affinity_header", RouteTypeBinding: "binding", RouteTypeRoundRobin: "round_robin"}
+
+func (t routeType) String() string {
+ if str, have := routeTypeToString[t]; have {
+ return str
+ }
+ return fmt.Sprintf("%T(%d)", t, t)
+}
+
+func (t routeType) MarshalJSON() ([]byte, error) {
+ if t == RouteTypeUndefined {
+ return json.Marshal(nil)
+ }
+ if str, have := routeTypeToString[t]; have {
+ return json.Marshal(str)
+ }
+ return nil, fmt.Errorf("unknown %T '%d'", t, t)
+}
+
+func (t *routeType) UnmarshalJSON(b []byte) error {
+ var str string
+ if err := json.Unmarshal(b, &str); err != nil {
+ return err
+ }
+ var have bool
+ if *t, have = stringToRouteType[str]; !have {
+ return fmt.Errorf("invalid %T %s", *t, str)
+ }
+ return nil
+}
+
+type associationType int
+
+const (
+ AssociationUndefined associationType = iota
+ AssociationRoundRobin
+)
+
+var stringToAssociationType = map[string]associationType{"round_robin": AssociationRoundRobin}
+var associationTypeToString = map[associationType]string{AssociationRoundRobin: "round_robin"}
+
+func (t associationType) MarshalJSON() ([]byte, error) {
+ if t == AssociationUndefined {
+ return json.Marshal(nil)
+ }
+ if str, have := associationTypeToString[t]; have {
+ return json.Marshal(str)
+ }
+ return nil, fmt.Errorf("unknown %T '%d'", t, t)
+}
+
+func (t *associationType) UnmarshalJSON(b []byte) error {
+ var str string
+ if err := json.Unmarshal(b, &str); err != nil {
+ return err
+ }
+ var have bool
+ if *t, have = stringToAssociationType[str]; !have {
+ return fmt.Errorf("invalid %T %s", *t, str)
+ }
+ return nil
+}
diff --git a/afrouter/afrouter/helpers.go b/afrouter/afrouter/helpers.go
deleted file mode 100644
index 441b4b9..0000000
--- a/afrouter/afrouter/helpers.go
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-// gRPC affinity router with active/active backends
-
-package afrouter
-
-//import "github.com/opencord/voltha-go/common/log"
-
-func strIndex(ar []string, match string) int {
- for idx, v := range ar {
- if v == match {
- return idx
- }
- }
- return 0
-}
diff --git a/afrouter/afrouter/method-details.go b/afrouter/afrouter/method-details.go
new file mode 100644
index 0000000..ad05121
--- /dev/null
+++ b/afrouter/afrouter/method-details.go
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "github.com/opencord/voltha-go/common/log"
+ "regexp"
+)
+
+type methodDetails struct {
+ all string
+ pkg string
+ service string
+ method string
+}
+
+// The compiled regex to extract the package/service/method
+var mthdSlicer = regexp.MustCompile(`^/([a-zA-Z][a-zA-Z0-9]+)\.([a-zA-Z][a-zA-Z0-9]+)/([a-zA-Z][a-zA-Z0-9]+)`)
+
+func newMethodDetails(fullMethodName string) methodDetails {
+ // The full method name is structured as follows:
+ // <package name>.<service>/<method>
+ mthdSlice := mthdSlicer.FindStringSubmatch(fullMethodName)
+ if mthdSlice == nil {
+ log.Errorf("Faled to slice full method %s, result: %v", fullMethodName, mthdSlice)
+ } else {
+ log.Debugf("Sliced full method %s: %v", fullMethodName, mthdSlice)
+ }
+ return methodDetails{
+ all: mthdSlice[0],
+ pkg: mthdSlice[1],
+ service: mthdSlice[2],
+ method: mthdSlice[3],
+ }
+}
diff --git a/afrouter/afrouter/method-router.go b/afrouter/afrouter/method-router.go
index 5e11fa6..2c1ca4f 100644
--- a/afrouter/afrouter/method-router.go
+++ b/afrouter/afrouter/method-router.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -28,14 +27,14 @@
const NoMeta = "nometa"
type MethodRouter struct {
- name string
- service string
- mthdRt map[string]map[string]Router // map of [metadata][method]
+ name string
+ service string
+ methodRouter map[string]map[string]Router // map of [metadata][method]
}
func newMethodRouter(config *RouterConfig) (Router, error) {
- mr := MethodRouter{name: config.Name, service: config.ProtoService, mthdRt: make(map[string]map[string]Router)}
- mr.mthdRt[NoMeta] = make(map[string]Router) // For routes not needing metadata (all expcept binding at this time)
+ mr := MethodRouter{name: config.Name, service: config.ProtoService, methodRouter: make(map[string]map[string]Router)}
+ mr.methodRouter[NoMeta] = make(map[string]Router) // For routes not needing metadata (all expcept binding at this time)
log.Debugf("Processing MethodRouter config %v", *config)
if len(config.Routes) == 0 {
return nil, errors.New(fmt.Sprintf("Router %s must have at least one route", config.Name))
@@ -47,10 +46,10 @@
if err != nil {
return nil, err
}
- if rtv.Type == "binding" {
+ if rtv.Type == RouteTypeBinding {
idx1 = rtv.Binding.Field
- if _, ok := mr.mthdRt[idx1]; ok == false { // /First attempt on this key
- mr.mthdRt[idx1] = make(map[string]Router)
+ if _, ok := mr.methodRouter[idx1]; !ok { // /First attempt on this key
+ mr.methodRouter[idx1] = make(map[string]Router)
}
} else {
idx1 = NoMeta
@@ -63,11 +62,11 @@
return r, nil
} else {
log.Debugf("Setting router '%s' for single method '%s'", r.Name(), rtv.Methods[0])
- if _, ok := mr.mthdRt[idx1][rtv.Methods[0]]; ok == false {
- mr.mthdRt[idx1][rtv.Methods[0]] = r
+ if _, ok := mr.methodRouter[idx1][rtv.Methods[0]]; !ok {
+ mr.methodRouter[idx1][rtv.Methods[0]] = r
} else {
err := errors.New(fmt.Sprintf("Attempt to define method %s for 2 routes: %s & %s", rtv.Methods[0],
- r.Name(), mr.mthdRt[idx1][rtv.Methods[0]].Name()))
+ r.Name(), mr.methodRouter[idx1][rtv.Methods[0]].Name()))
log.Error(err)
return mr, err
}
@@ -75,11 +74,11 @@
default:
for _, m := range rtv.Methods {
log.Debugf("Processing Method %s", m)
- if _, ok := mr.mthdRt[idx1][m]; ok == false {
+ if _, ok := mr.methodRouter[idx1][m]; !ok {
log.Debugf("Setting router '%s' for method '%s'", r.Name(), m)
- mr.mthdRt[idx1][m] = r
+ mr.methodRouter[idx1][m] = r
} else {
- err := errors.New(fmt.Sprintf("Attempt to define method %s for 2 routes: %s & %s", m, r.Name(), mr.mthdRt[idx1][m].Name()))
+ err := errors.New(fmt.Sprintf("Attempt to define method %s for 2 routes: %s & %s", m, r.Name(), mr.methodRouter[idx1][m].Name()))
log.Error(err)
return mr, err
}
@@ -99,8 +98,8 @@
}
func (mr MethodRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
- var rtrnK string = NoMeta
- var rtrnV string = ""
+ var rtrnK = NoMeta
+ var rtrnV = ""
// Get the metadata from the server stream
md, ok := metadata.FromIncomingContext(serverStream.Context())
@@ -109,8 +108,8 @@
}
// Determine if one of the method routing keys exists in the metadata
- for k, _ := range mr.mthdRt {
- if _, ok := md[k]; ok == true {
+ for k := range mr.methodRouter {
+ if _, ok := md[k]; ok {
rtrnV = md[k][0]
rtrnK = k
break
@@ -123,7 +122,7 @@
func (mr MethodRouter) ReplyHandler(sel interface{}) error {
switch sl := sel.(type) {
case *sbFrame:
- if r, ok := mr.mthdRt[NoMeta][sl.method]; ok == true {
+ if r, ok := mr.methodRouter[NoMeta][sl.method]; ok {
return r.ReplyHandler(sel)
}
// TODO: this case should also be an error
@@ -137,18 +136,18 @@
func (mr MethodRouter) Route(sel interface{}) *backend {
switch sl := sel.(type) {
case *nbFrame:
- if r, ok := mr.mthdRt[sl.metaKey][sl.mthdSlice[REQ_METHOD]]; ok == true {
+ if r, ok := mr.methodRouter[sl.metaKey][sl.methodInfo.method]; ok {
return r.Route(sel)
}
- log.Errorf("Attept to route on non-existent method '%s'", sl.mthdSlice[REQ_METHOD])
+ log.Errorf("Attept to route on non-existent method '%s'", sl.methodInfo.method)
return nil
default:
return nil
}
}
-func (mr MethodRouter) BackendCluster(mthd string, metaKey string) (*backendCluster, error) {
- if r, ok := mr.mthdRt[metaKey][mthd]; ok == true {
+func (mr MethodRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
+ if r, ok := mr.methodRouter[metaKey][mthd]; ok {
return r.BackendCluster(mthd, metaKey)
}
err := errors.New(fmt.Sprintf("No backend cluster exists for method '%s' using meta key '%s'", mthd, metaKey))
@@ -156,8 +155,8 @@
return nil, err
}
-func (mr MethodRouter) FindBackendCluster(beName string) *backendCluster {
- for _, meta := range mr.mthdRt {
+func (mr MethodRouter) FindBackendCluster(beName string) *cluster {
+ for _, meta := range mr.methodRouter {
for _, r := range meta {
if rtrn := r.FindBackendCluster(beName); rtrn != nil {
return rtrn
diff --git a/afrouter/afrouter/round-robin-router.go b/afrouter/afrouter/round-robin-router.go
index 65d883a..70f164a 100644
--- a/afrouter/afrouter/round-robin-router.go
+++ b/afrouter/afrouter/round-robin-router.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -25,16 +24,15 @@
)
type RoundRobinRouter struct {
- name string
- routerType int // TODO: Likely not needed.
- grpcService string
- bkndClstr *backendCluster
- curBknd **backend
+ name string
+ grpcService string
+ cluster *cluster
+ currentBackend **backend
}
func newRoundRobinRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
var err error = nil
- var rtrn_err bool = false
+ var rtrn_err = false
// Validate the configuration
log.Debug("Creating a new round robin router")
@@ -57,24 +55,15 @@
var bptr *backend
bptr = nil
rr := RoundRobinRouter{
- name: config.Name,
- grpcService: rconf.ProtoService,
- curBknd: &bptr,
- }
-
- // This has already been validated bfore this function
- // is called so just use it.
- for idx := range rTypeNames {
- if config.Type == rTypeNames[idx] {
- rr.routerType = idx
- break
- }
+ name: config.Name,
+ grpcService: rconf.ProtoService,
+ currentBackend: &bptr,
}
// Create the backend cluster or link to an existing one
ok := true
- if rr.bkndClstr, ok = bClusters[config.backendCluster.Name]; ok == false {
- if rr.bkndClstr, err = newBackendCluster(config.backendCluster); err != nil {
+ if rr.cluster, ok = clusters[config.backendCluster.Name]; !ok {
+ if rr.cluster, err = newBackendCluster(config.backendCluster); err != nil {
log.Errorf("Could not create a backend for router %s", config.Name)
rtrn_err = true
}
@@ -91,8 +80,8 @@
return "", "", nil
}
-func (rr RoundRobinRouter) BackendCluster(s string, mk string) (*backendCluster, error) {
- return rr.bkndClstr, nil
+func (rr RoundRobinRouter) BackendCluster(s string, mk string) (*cluster, error) {
+ return rr.cluster, nil
}
func (rr RoundRobinRouter) Name() string {
@@ -104,8 +93,8 @@
switch sl := sel.(type) {
case *nbFrame:
// Since this is a round robin router just get the next backend
- if *rr.curBknd, err = rr.bkndClstr.nextBackend(*rr.curBknd, BE_SEQ_RR); err == nil {
- return *rr.curBknd
+ if *rr.currentBackend, err = rr.cluster.nextBackend(*rr.currentBackend, BackendSequenceRoundRobin); err == nil {
+ return *rr.currentBackend
} else {
sl.err = err
return nil
@@ -120,9 +109,9 @@
return rr.grpcService
}
-func (rr RoundRobinRouter) FindBackendCluster(becName string) *backendCluster {
- if becName == rr.bkndClstr.name {
- return rr.bkndClstr
+func (rr RoundRobinRouter) FindBackendCluster(becName string) *cluster {
+ if becName == rr.cluster.name {
+ return rr.cluster
}
return nil
}
diff --git a/afrouter/afrouter/router.go b/afrouter/afrouter/router.go
index f71bd30..323ffb2 100644
--- a/afrouter/afrouter/router.go
+++ b/afrouter/afrouter/router.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -23,26 +22,15 @@
"google.golang.org/grpc"
)
-const (
- RT_RPC_AFFINITY_MESSAGE = iota + 1
- RT_RPC_AFFINITY_HEADER = iota + 1
- RT_BINDING = iota + 1
- RT_ROUND_ROBIN = iota + 1
-)
-
-// String names for display in error messages.
-var rTypeNames = []string{"", "rpc_affinity_message", "rpc_affinity_header", "binding", "round_robin"}
-var rAssnNames = []string{"", "round_robin"}
-
-var allRouters map[string]Router = make(map[string]Router)
+var allRouters = make(map[string]Router)
// The router interface
type Router interface {
Name() string
Route(interface{}) *backend
Service() string
- BackendCluster(string, string) (*backendCluster, error)
- FindBackendCluster(string) *backendCluster
+ BackendCluster(string, string) (*cluster, error)
+ FindBackendCluster(string) *cluster
ReplyHandler(interface{}) error
GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error)
}
@@ -56,21 +44,20 @@
}
func newSubRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
- idx := strIndex(rTypeNames, config.Type)
- switch idx {
- case RT_RPC_AFFINITY_MESSAGE:
+ switch config.Type {
+ case RouteTypeRpcAffinityMessage:
r, err := newAffinityRouter(rconf, config)
if err == nil {
allRouters[rconf.Name+config.Name] = r
}
return r, err
- case RT_BINDING:
+ case RouteTypeBinding:
r, err := newBindingRouter(rconf, config)
if err == nil {
allRouters[rconf.Name+config.Name] = r
}
return r, err
- case RT_ROUND_ROBIN:
+ case RouteTypeRoundRobin:
r, err := newRoundRobinRouter(rconf, config)
if err == nil {
allRouters[rconf.Name+config.Name] = r
diff --git a/afrouter/afrouter/server.go b/afrouter/afrouter/server.go
index 0691da1..dff1519 100644
--- a/afrouter/afrouter/server.go
+++ b/afrouter/afrouter/server.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package afrouter
@@ -24,7 +23,6 @@
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"net"
- "regexp"
"strconv"
)
@@ -35,38 +33,18 @@
}
)
-const (
- REQ_ALL = 0
- REQ_PACKAGE = 1
- REQ_SERVICE = 2
- REQ_METHOD = 3
-)
-
type server struct {
running bool
name string
- stype nbi
proxyListener net.Listener
routers map[string]map[string]Router
proxyServer *grpc.Server
}
-type nbRequest struct {
- srv interface{}
- serverStream grpc.ServerStream
- r Router
- mthdSlice []string
- metaKey string // There should be at most one key specified. More than one is an error.
- metaVal string // This is the value extracted from the meta key if it exists or "" otherwise
-}
-
-var mthdSlicerExp string = `^/([a-zA-Z][a-zA-Z0-9]+)\.([a-zA-Z][a-zA-Z0-9]+)/([a-zA-Z][a-zA-Z0-9]+)`
-var mthdSlicer *regexp.Regexp // The compiled regex to extract the package/service/method
-
func newServer(config *ServerConfig) (*server, error) {
var err error = nil
- var rtrn_err bool = false
- var srvr *server
+ var rtrn_err = false
+ var s *server
// Change over to the new configuration format
// Validate the configuration
// There should be a name
@@ -98,20 +76,20 @@
rtrn_err = true
}
- if rtrn_err == true {
+ if rtrn_err {
return nil, errors.New("Server configuration failed")
} else {
// The configuration is valid, create a server and configure it.
- srvr = &server{name: config.Name, routers: make(map[string]map[string]Router)}
+ s = &server{name: config.Name, routers: make(map[string]map[string]Router)}
// The listener
- if srvr.proxyListener, err =
+ if s.proxyListener, err =
net.Listen("tcp", config.Addr+":"+
strconv.Itoa(int(config.Port))); err != nil {
log.Error(err)
return nil, err
}
// Create the routers
- log.Debugf("Configuring the routers for server %s", srvr.name)
+ log.Debugf("Configuring the routers for server %s", s.name)
for p, r := range config.routers {
log.Debugf("Processing router %s for package %s", r.Name, p)
if dr, err := newRouter(r); err != nil {
@@ -119,25 +97,23 @@
return nil, err
} else {
log.Debugf("Adding router %s to the server %s for package %s and service %s",
- dr.Name(), srvr.name, p, dr.Service())
- if _, ok := srvr.routers[p]; ok {
- srvr.routers[p][dr.Service()] = dr
+ dr.Name(), s.name, p, dr.Service())
+ if _, ok := s.routers[p]; ok {
+ s.routers[p][dr.Service()] = dr
} else {
- srvr.routers[p] = make(map[string]Router)
- srvr.routers[p][dr.Service()] = dr
+ s.routers[p] = make(map[string]Router)
+ s.routers[p][dr.Service()] = dr
}
}
}
// Configure the grpc handler
- srvr.proxyServer = grpc.NewServer(
+ s.proxyServer = grpc.NewServer(
grpc.CustomCodec(Codec()),
- grpc.UnknownServiceHandler(srvr.TransparentHandler()),
+ grpc.UnknownServiceHandler(s.TransparentHandler()),
)
}
- // Compile the regular expression to extract the method
- mthdSlicer = regexp.MustCompile(mthdSlicerExp)
- return srvr, nil
+ return s, nil
}
func (s *server) Name() string {
@@ -148,12 +124,12 @@
return s.handler
}
-func (s *server) getRouter(pkg *string, service *string) (Router, bool) {
- if fn, ok := s.routers[*pkg][*service]; ok { // Both specified
+func (s *server) getRouter(pkg string, service string) (Router, bool) {
+ if fn, ok := s.routers[pkg][service]; ok { // Both specified
return fn, ok
- } else if fn, ok = s.routers["*"][*service]; ok { // Package wild card
+ } else if fn, ok = s.routers["*"][service]; ok { // Package wild card
return fn, ok
- } else if fn, ok = s.routers[*pkg]["*"]; ok { // Service wild card
+ } else if fn, ok = s.routers[pkg]["*"]; ok { // Service wild card
return fn, ok
} else if fn, ok = s.routers["*"]["*"]; ok { // Both Wildcarded
return fn, ok
@@ -169,23 +145,14 @@
return grpc.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
}
log.Debugf("Processing grpc request %s on server %s", fullMethodName, s.name)
- // The full method name is structured as follows:
- // <package name>.<service>/<method>
- mthdSlice := mthdSlicer.FindStringSubmatch(fullMethodName)
- if mthdSlice == nil {
- log.Errorf("Faled to slice full method %s, result: %v", fullMethodName, mthdSlice)
- } else {
- log.Debugf("Sliced full method %s: %v", fullMethodName, mthdSlice)
- }
- r, ok := s.getRouter(&mthdSlice[REQ_PACKAGE], &mthdSlice[REQ_SERVICE])
- //fn, ok := s.routers[mthdSlice[REQ_PACKAGE]][mthdSlice[REQ_SERVICE]]
+ methodInfo := newMethodDetails(fullMethodName)
+ r, ok := s.getRouter(methodInfo.pkg, methodInfo.service)
+ //fn, ok := s.routers[methodInfo.pkg][methodInfo.service]
if !ok {
// TODO: Should this be punted to a default transparent router??
// Probably not, if one is defined yes otherwise just crap out.
- err := errors.New(
- fmt.Sprintf("Unable to dispatch! Service '%s' for package '%s' not found.",
- mthdSlice[REQ_SERVICE], mthdSlice[REQ_PACKAGE]))
+ err := fmt.Errorf("Unable to dispatch! Service '%s' for package '%s' not found.", methodInfo.service, methodInfo.pkg)
log.Error(err)
return err
}
@@ -197,13 +164,13 @@
return err
}
- //nbR := &nbRequest(srv:srv,serverStream:serverStream,r:r,mthdSlice:mthdSlice,metaKey:mk,metaVal:mv)
+ //nbR := &nbRequest(srv:srv,serverStream:serverStream,r:r,methodInfo:methodInfo,metaKey:mk,metaVal:mv)
// Extract the cluster from the selected router and use it to manage the transfer
- if bkndClstr, err := r.BackendCluster(mthdSlice[REQ_METHOD], mk); err != nil {
+ if cluster, err := r.BackendCluster(methodInfo.method, mk); err != nil {
return err
} else {
- //return bkndClstr.handler(nbR)
- return bkndClstr.handler(srv, serverStream, r, mthdSlice, mk, mv)
+ //return beCluster.handler(nbR)
+ return cluster.handler(srv, serverStream, r, methodInfo, mk, mv)
}
}
diff --git a/afrouter/afrouter/signals.go b/afrouter/afrouter/signals.go
index 5416fda..37b154b 100644
--- a/afrouter/afrouter/signals.go
+++ b/afrouter/afrouter/signals.go
@@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
// This file implements an exit handler that tries to shut down all the
// running servers before finally exiting. There are 2 triggers to this
@@ -30,7 +29,6 @@
var errChan = make(chan error)
var doneChan = make(chan error)
-var holdChan = make(chan int)
func InitExitHandler() error {
@@ -75,7 +73,7 @@
}
}
}
- for _, cl := range bClusters {
+ for _, cl := range clusters {
for _, bknd := range cl.backends {
log.Debugf("Closing backend %s", bknd.name)
for _, conn := range bknd.connections {
diff --git a/afrouter/afrouter/streams.go b/afrouter/afrouter/streams.go
new file mode 100644
index 0000000..458ed06
--- /dev/null
+++ b/afrouter/afrouter/streams.go
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "context"
+ "errors"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+ "sort"
+ "sync"
+)
+
+type streams struct {
+ mutex sync.Mutex
+ activeStream *stream
+ streams map[string]*stream
+ sortedStreams []*stream
+}
+
+type stream struct {
+ stream grpc.ClientStream
+ ctx context.Context
+ cancel context.CancelFunc
+ ok2Close chan struct{}
+ c2sReturn chan error
+ s2cReturn error
+}
+
+func (s *streams) clientCancel() {
+ for _, strm := range s.streams {
+ if strm != nil {
+ strm.cancel()
+ }
+ }
+}
+
+func (s *streams) closeSend() {
+ for _, strm := range s.streams {
+ if strm != nil {
+ <-strm.ok2Close
+ log.Debug("Closing southbound stream")
+ strm.stream.CloseSend()
+ }
+ }
+}
+
+func (s *streams) trailer() metadata.MD {
+ return s.activeStream.stream.Trailer()
+}
+
+func (s *streams) getActive() *stream {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ return s.activeStream
+}
+
+func (s *streams) setThenGetActive(strm *stream) *stream {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ if s.activeStream == nil {
+ s.activeStream = strm
+ }
+ return s.activeStream
+}
+
+func (s *streams) forwardClientToServer(dst grpc.ServerStream, f *sbFrame) chan error {
+ fc2s := func(srcS *stream) {
+ for i := 0; ; i++ {
+ if err := srcS.stream.RecvMsg(f); err != nil {
+ if s.setThenGetActive(srcS) == srcS {
+ srcS.c2sReturn <- err // this can be io.EOF which is the success case
+ } else {
+ srcS.c2sReturn <- nil // Inactive responder
+ }
+ close(srcS.ok2Close)
+ break
+ }
+ if s.setThenGetActive(srcS) != srcS {
+ srcS.c2sReturn <- nil
+ continue
+ }
+ if i == 0 {
+ // This is a bit of a hack, but client to server headers are only readable after first client msg is
+ // received but must be written to server stream before the first msg is flushed.
+ // This is the only place to do it nicely.
+ md, err := srcS.stream.Header()
+ if err != nil {
+ srcS.c2sReturn <- err
+ break
+ }
+ // Update the metadata for the response.
+ if f.metaKey != NoMeta {
+ if f.metaVal == "" {
+ // We could also alsways just do this
+ md.Set(f.metaKey, f.backend.name)
+ } else {
+ md.Set(f.metaKey, f.metaVal)
+ }
+ }
+ if err := dst.SendHeader(md); err != nil {
+ srcS.c2sReturn <- err
+ break
+ }
+ }
+ log.Debugf("Northbound frame %v", f.payload)
+ if err := dst.SendMsg(f); err != nil {
+ srcS.c2sReturn <- err
+ break
+ }
+ }
+ }
+
+ // There should be AT LEAST one open stream at this point
+ // if there isn't its a grave error in the code and it will
+ // cause this thread to block here so check for it and
+ // don't let the lock up happen but report the error
+ ret := make(chan error, 1)
+ agg := make(chan *stream)
+ atLeastOne := false
+ for _, strm := range s.streams {
+ if strm != nil {
+ go fc2s(strm)
+ go func(s *stream) { // Wait on result and aggregate
+ r := <-s.c2sReturn // got the return code
+ if r == nil {
+ return // We're the redundant stream, just die
+ }
+ s.c2sReturn <- r // put it back to pass it along
+ agg <- s // send the stream to the aggregator
+ }(strm)
+ atLeastOne = true
+ }
+ }
+ if atLeastOne == true {
+ go func() { // Wait on aggregated result
+ s := <-agg
+ ret <- <-s.c2sReturn
+ }()
+ } else {
+ err := errors.New("There are no open streams. Unable to forward message.")
+ log.Error(err)
+ ret <- err
+ }
+ return ret
+}
+
+func (s *streams) sendAll(f *nbFrame) error {
+ var rtrn error
+
+ atLeastOne := false
+ for _, strm := range s.sortedStreams {
+ if strm != nil {
+ if err := strm.stream.SendMsg(f); err != nil {
+ log.Debugf("Error on SendMsg: %s", err.Error())
+ strm.s2cReturn = err
+ }
+ atLeastOne = true
+ } else {
+ log.Debugf("Nil stream")
+ }
+ }
+ // If one of the streams succeeded, declare success
+ // if none did pick an error and return it.
+ if atLeastOne == true {
+ for _, strm := range s.sortedStreams {
+ if strm != nil {
+ rtrn = strm.s2cReturn
+ if rtrn == nil {
+ return rtrn
+ }
+ }
+ }
+ return rtrn
+ } else {
+ rtrn = errors.New("There are no open streams, this should never happen")
+ log.Error(rtrn)
+ }
+ return rtrn
+}
+
+func (s *streams) forwardServerToClient(src grpc.ServerStream, f *nbFrame) chan error {
+ ret := make(chan error, 1)
+ go func() {
+ // The frame buffer already has the results of a first
+ // RecvMsg in it so the first thing to do is to
+ // send it to the list of client streams and only
+ // then read some more.
+ for i := 0; ; i++ {
+ // Send the message to each of the backend streams
+ if err := s.sendAll(f); err != nil {
+ ret <- err
+ log.Debugf("SendAll failed %s", err.Error())
+ break
+ }
+ log.Debugf("Southbound frame %v", f.payload)
+ if err := src.RecvMsg(f); err != nil {
+ ret <- err // this can be io.EOF which is happy case
+ break
+ }
+ }
+ }()
+ return ret
+}
+
+func (s *streams) sortStreams() {
+ var tmpKeys []string
+ for k := range s.streams {
+ tmpKeys = append(tmpKeys, k)
+ }
+ sort.Strings(tmpKeys)
+ for _, v := range tmpKeys {
+ s.sortedStreams = append(s.sortedStreams, s.streams[v])
+ }
+}
diff --git a/afrouter/arouter.go b/afrouter/arouter.go
index 65bb7df..4342f68 100644
--- a/afrouter/arouter.go
+++ b/afrouter/arouter.go
@@ -13,13 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// gRPC affinity router with active/active backends
package main
-/* package main // import "github.com/opencord/voltha-go/arouter" */
-/* package main // import "github.com/opencord/voltha-go" */
-
import (
"fmt"
"github.com/opencord/voltha-go/afrouter/afrouter"
diff --git a/afrouter/arouter.json b/afrouter/arouter.json
index 329a7d4..c1530a7 100644
--- a/afrouter/arouter.json
+++ b/afrouter/arouter.json
@@ -1,225 +1,279 @@
{
- "servers": [
- {
- "name": "grpc_command",
- "port": 55555,
- "address":"",
- "type": "grpc",
- "routers": [
- {
- "_TODO":"Suport a router list, remove the package and service from the router",
- "package":"voltha",
- "service":"VolthaService",
- "router":"vcore"
- }
- ]
- }
- ],
- "routers": [
- {
- "name":"vcore",
- "package": "voltha",
- "service": "VolthaService",
- "routes": [
- {
- "name":"dev_manager",
- "proto_descriptor":"voltha.pb",
- "type":"rpc_affinity_message",
- "association":"round_robin",
- "routing_field": "id",
- "backend_cluster":"vcore",
- "_COMMENT":"Methods are naturally southbound affinity binding unless otherwise specified below",
- "methods":[ "CreateDevice",
- "GetCoreInstance",
- "EnableLogicalDevicePort",
- "DisableLogicalDevicePort",
- "EnableDevice",
- "DisableDevice",
- "RebootDevice",
- "DeleteDevice",
- "DownloadImage",
- "CancelImageDownload",
- "ActivateImageUpdate",
- "RevertImageUpdate",
- "UpdateDevicePmConfigs",
- "CreateAlarmFilter",
- "UpdateAlarmFilter",
- "DeleteAlarmFilter",
- "SelfTest"],
- "_COMMENT":"If a method is northbound affinity binding then association is used to route",
- "_COMMENT":"but affinity is not set southbound but only based on the response",
- "_COMMENT":"Methods here MUST be specified above, this overrides thier default beahvior",
- "nb_binding_methods":["CreateDevice"],
- "_TODO":"Overrides not implemented yet, config ignored",
- "overrides": [
- {"methods":["abc","def"], "routing_field":"id"},
- {"methods":["ghi","jkl"]},
- {"method":"mno", "routing_field":"id"},
- {"method":"pqr"}
- ]
- },
- {
- "name":"read_only",
- "type":"round_robin",
- "association":"round_robin",
- "backend_cluster":"ro_vcore",
- "methods":[ "ListDevicePorts",
- "ListDevicePmConfigs",
- "GetImages",
- "GetImageDownloadStatus",
- "GetImageDownload",
- "ListImageDownloads",
- "ListDeviceFlows",
- "ListDeviceFlowGroups",
- "ListLogicalDeviceFlows",
- "ListLogicalDeviceFlowGroups",
- "ListDevices",
- "GetDevice",
- "ListLogicalDevices",
- "GetLogicalDevices",
- "GetDeviceType",
- "GetDeviceGroup",
- "GetLogicalDevice",
- "GetAlarmFilter",
- "ListLogicalDevicePorts",
- "GetLogicalDevicePort"
- ]
- },
- {
- "name":"dev_manager_ofagent",
- "type":"binding",
- "_association":"round_robin",
- "binding": {
- "type":"header",
- "field":"voltha_backend_name",
- "method":"Subscribe",
- "association":"round_robin"
- },
- "backend_cluster":"vcore",
- "methods":["StreamPacketsOut",
- "ReceivePacketsIn",
- "ReceiveChangeEvents",
- "Subscribe",
- "UpdateLogicalDeviceFlowTable",
- "UpdateLogicalDeviceFlowGroupTable",
- "GetLogicalDevice",
- "GetLogicalDevicePort",
- "EnableLogicalDevicePort",
- "DisableLogicalDevicePort",
- "ListLogicalDevices",
- "ListLogicalDeviceFlows",
- "ListLogicalDeviceFlowGroups",
- "ListLogicalDevicePorts"
- ],
- "_TODO":"Overrides not implemented yet, config ignored",
- "overrides": [
- {"methods":["abc","def"], "routing_field":"id"},
- {"methods":["ghi","jkl"]},
- {"method":"mno", "routing_field":"id"},
- {"method":"pqr"}
- ]
- }
- ]
- }
- ],
- "backend_clusters": [
- {
- "name":"vcore",
- "backends":[ {
- "name":"vcore1",
- "type":"active_active",
- "association": {
- "strategy":"serial_number",
- "location":"header",
- "_TODO":"The key below needs to be implemented, currently hard coded",
- "key":"voltha_serial_number"
- },
- "connections": [ {
- "name":"vcore11",
- "addr":"",
- "port":""
- },
- {
- "name":"vcore12",
- "addr":"",
- "port":""
- }]
- },
- {
- "name":"vcore2",
- "type":"active_active",
- "association": {
- "strategy":"serial_number",
- "location":"header",
- "_TODO":"The key below needs to be implemented, currently hard coded",
- "key":"voltha_serial_number"
- },
- "connections": [ {
- "name":"vcore21",
- "addr":"",
- "port":""
- },
- {
- "name":"vcore22",
- "addr":"",
- "port":""
- }]
- },
- {
- "name":"vcore3",
- "type":"active_active",
- "association": {
- "strategy":"serial_number",
- "location":"header",
- "_TODO":"The key below needs to be implemented, currently hard coded",
- "key":"voltha_serial_number"
- },
- "connections": [ {
- "name":"vcore31",
- "addr":"",
- "port":""
- },
- {
- "name":"vcore32",
- "addr":"",
- "port":""
- }]
- }]
- },
- {
- "name":"ro_vcore",
- "backends":[ {
- "name":"ro_vcore1",
- "type":"server",
- "connections": [ {
- "name":"ro_vcore11",
- "addr":"",
- "port":""
- }]
- },
- {
- "name":"ro_vcore2",
- "type":"server",
- "connections": [ {
- "name":"ro_vcore21",
- "addr":"",
- "port":""
- }]
- },
- {
- "name":"ro_vcore3",
- "type":"server",
- "connections": [ {
- "name":"ro_vcore31",
- "addr":"",
- "port":""
- }]
- }]
- }
- ],
- "api": {
- "_comment":"If this isn't defined then no api is available for dynamic configuration and queries",
- "address":"",
- "port":55554
- }
+ "servers": [
+ {
+ "name": "grpc_command",
+ "port": 55555,
+ "address": "",
+ "type": "grpc",
+ "routers": [
+ {
+ "_TODO": "Suport a router list, remove the package and service from the router",
+ "package": "voltha",
+ "service": "VolthaService",
+ "router": "vcore"
+ }
+ ]
+ }
+ ],
+ "routers": [
+ {
+ "name": "vcore",
+ "package": "voltha",
+ "service": "VolthaService",
+ "routes": [
+ {
+ "name": "dev_manager",
+ "proto_descriptor": "voltha.pb",
+ "type": "rpc_affinity_message",
+ "association": "round_robin",
+ "routing_field": "id",
+ "backend_cluster": "vcore",
+ "_COMMENT": "Methods are naturally southbound affinity binding unless otherwise specified below",
+ "methods": [
+ "CreateDevice",
+ "GetCoreInstance",
+ "EnableLogicalDevicePort",
+ "DisableLogicalDevicePort",
+ "EnableDevice",
+ "DisableDevice",
+ "RebootDevice",
+ "DeleteDevice",
+ "DownloadImage",
+ "CancelImageDownload",
+ "ActivateImageUpdate",
+ "RevertImageUpdate",
+ "UpdateDevicePmConfigs",
+ "CreateAlarmFilter",
+ "UpdateAlarmFilter",
+ "DeleteAlarmFilter",
+ "SelfTest"
+ ],
+ "_COMMENT": "If a method is northbound affinity binding then association is used to route",
+ "_COMMENT": "but affinity is not set southbound but only based on the response",
+ "_COMMENT": "Methods here MUST be specified above, this overrides thier default beahvior",
+ "nb_binding_methods": [
+ "CreateDevice"
+ ],
+ "_TODO": "Overrides not implemented yet, config ignored",
+ "overrides": [
+ {
+ "methods": [
+ "abc",
+ "def"
+ ],
+ "routing_field": "id"
+ },
+ {
+ "methods": [
+ "ghi",
+ "jkl"
+ ]
+ },
+ {
+ "method": "mno",
+ "routing_field": "id"
+ },
+ {
+ "method": "pqr"
+ }
+ ]
+ },
+ {
+ "name": "read_only",
+ "type": "round_robin",
+ "association": "round_robin",
+ "backend_cluster": "ro_vcore",
+ "methods": [
+ "ListDevicePorts",
+ "ListDevicePmConfigs",
+ "GetImages",
+ "GetImageDownloadStatus",
+ "GetImageDownload",
+ "ListImageDownloads",
+ "ListDeviceFlows",
+ "ListDeviceFlowGroups",
+ "ListLogicalDeviceFlows",
+ "ListLogicalDeviceFlowGroups",
+ "ListDevices",
+ "GetDevice",
+ "ListLogicalDevices",
+ "GetLogicalDevices",
+ "GetDeviceType",
+ "GetDeviceGroup",
+ "GetLogicalDevice",
+ "GetAlarmFilter",
+ "ListLogicalDevicePorts",
+ "GetLogicalDevicePort"
+ ]
+ },
+ {
+ "name": "dev_manager_ofagent",
+ "type": "binding",
+ "_association": "round_robin",
+ "binding": {
+ "type": "header",
+ "field": "voltha_backend_name",
+ "method": "Subscribe",
+ "association": "round_robin"
+ },
+ "backend_cluster": "vcore",
+ "methods": [
+ "StreamPacketsOut",
+ "ReceivePacketsIn",
+ "ReceiveChangeEvents",
+ "Subscribe",
+ "UpdateLogicalDeviceFlowTable",
+ "UpdateLogicalDeviceFlowGroupTable",
+ "GetLogicalDevice",
+ "GetLogicalDevicePort",
+ "EnableLogicalDevicePort",
+ "DisableLogicalDevicePort",
+ "ListLogicalDevices",
+ "ListLogicalDeviceFlows",
+ "ListLogicalDeviceFlowGroups",
+ "ListLogicalDevicePorts"
+ ],
+ "_TODO": "Overrides not implemented yet, config ignored",
+ "overrides": [
+ {
+ "methods": [
+ "abc",
+ "def"
+ ],
+ "routing_field": "id"
+ },
+ {
+ "methods": [
+ "ghi",
+ "jkl"
+ ]
+ },
+ {
+ "method": "mno",
+ "routing_field": "id"
+ },
+ {
+ "method": "pqr"
+ }
+ ]
+ }
+ ]
+ }
+ ],
+ "backend_clusters": [
+ {
+ "name": "vcore",
+ "backends": [
+ {
+ "name": "vcore1",
+ "type": "active_active",
+ "association": {
+ "strategy": "serial_number",
+ "location": "header",
+ "_TODO": "The key below needs to be implemented, currently hard coded",
+ "key": "voltha_serial_number"
+ },
+ "connections": [
+ {
+ "name": "vcore11",
+ "addr": "",
+ "port": ""
+ },
+ {
+ "name": "vcore12",
+ "addr": "",
+ "port": ""
+ }
+ ]
+ },
+ {
+ "name": "vcore2",
+ "type": "active_active",
+ "association": {
+ "strategy": "serial_number",
+ "location": "header",
+ "_TODO": "The key below needs to be implemented, currently hard coded",
+ "key": "voltha_serial_number"
+ },
+ "connections": [
+ {
+ "name": "vcore21",
+ "addr": "",
+ "port": ""
+ },
+ {
+ "name": "vcore22",
+ "addr": "",
+ "port": ""
+ }
+ ]
+ },
+ {
+ "name": "vcore3",
+ "type": "active_active",
+ "association": {
+ "strategy": "serial_number",
+ "location": "header",
+ "_TODO": "The key below needs to be implemented, currently hard coded",
+ "key": "voltha_serial_number"
+ },
+ "connections": [
+ {
+ "name": "vcore31",
+ "addr": "",
+ "port": ""
+ },
+ {
+ "name": "vcore32",
+ "addr": "",
+ "port": ""
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "ro_vcore",
+ "backends": [
+ {
+ "name": "ro_vcore1",
+ "type": "server",
+ "connections": [
+ {
+ "name": "ro_vcore11",
+ "addr": "",
+ "port": ""
+ }
+ ]
+ },
+ {
+ "name": "ro_vcore2",
+ "type": "server",
+ "connections": [
+ {
+ "name": "ro_vcore21",
+ "addr": "",
+ "port": ""
+ }
+ ]
+ },
+ {
+ "name": "ro_vcore3",
+ "type": "server",
+ "connections": [
+ {
+ "name": "ro_vcore31",
+ "addr": "",
+ "port": ""
+ }
+ ]
+ }
+ ]
+ }
+ ],
+ "api": {
+ "_comment": "If this isn't defined then no api is available for dynamic configuration and queries",
+ "address": "",
+ "port": 55554
+ }
}
diff --git a/tests/afrouter/suites/test1/test1.json b/tests/afrouter/suites/test1/test1.json
index c0326b1..b11ccec 100644
--- a/tests/afrouter/suites/test1/test1.json
+++ b/tests/afrouter/suites/test1/test1.json
@@ -214,7 +214,7 @@
"method":"GetGoroutineCount",
"param":"{}",
"meta": [ ],
- "expect":"{Count:45}",
+ "expect":"{Count:43}",
"expectMeta": [ ]
},
"servers": [ ]
diff --git a/tests/afrouter/suites/test2/test2.tmpl.json b/tests/afrouter/suites/test2/test2.tmpl.json
index a15230d..879b533 100644
--- a/tests/afrouter/suites/test2/test2.tmpl.json
+++ b/tests/afrouter/suites/test2/test2.tmpl.json
@@ -230,7 +230,7 @@
"method":"GetGoroutineCount",
"param":"{}",
"meta": [ ],
- "expect":"{Count:45}",
+ "expect":"{Count:43}",
"expectMeta": [ ]
},
"servers": [ ]
diff --git a/tests/afrouter/suites/test3/test3.tmpl.json b/tests/afrouter/suites/test3/test3.tmpl.json
index a536d60..4bf49cd 100644
--- a/tests/afrouter/suites/test3/test3.tmpl.json
+++ b/tests/afrouter/suites/test3/test3.tmpl.json
@@ -243,7 +243,7 @@
"method":"GetGoroutineCount",
"param":"{}",
"meta": [ ],
- "expect":"{Count:45}",
+ "expect":"{Count:43}",
"expectMeta": [ ]
},
"servers": [ ]