VOL-1967 move api-server to separate repository
Current with voltha-go acf0adaf2d91ae72b55192cc8a939e0485918d16
Change-Id: I000ea6be0789e20c922bd671562b58a7120892ae
diff --git a/internal/pkg/afrouter/affinity-router.go b/internal/pkg/afrouter/affinity-router.go
new file mode 100644
index 0000000..7a94f7b
--- /dev/null
+++ b/internal/pkg/afrouter/affinity-router.go
@@ -0,0 +1,377 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "errors"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "regexp"
+ "strconv"
+)
+
+// TODO: Used in multiple routers, should move to common file
+const (
+ PKG_MTHD_PKG int = 1
+ PKG_MTHD_MTHD int = 2
+)
+
+type AffinityRouter struct {
+ name string
+ association associationType
+ routingField string
+ grpcService string
+ methodMap map[string]byte
+ nbBindingMethodMap map[string]byte
+ cluster *cluster
+ affinity map[string]*backend
+ currentBackend **backend
+}
+
+func newAffinityRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+ var err error = nil
+ var rtrn_err = false
+ var pkg_re = regexp.MustCompile(`^(\.[^.]+\.)(.+)$`)
+ // Validate the configuration
+
+ // A name must exist
+ if config.Name == "" {
+ log.Error("A router 'name' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoPackage == "" {
+ log.Error("A 'package' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoService == "" {
+ log.Error("A 'service' must be specified")
+ rtrn_err = true
+ }
+
+ //if config.RouteField == "" {
+ // log.Error("A 'routing_field' must be specified")
+ // rtrn_err = true
+ //}
+
+ // TODO The overrieds section is currently not being used
+ // so the router will route all methods based on the
+ // routing_field. This needs to be added so that methods
+ // can have different routing fields.
+ var bptr *backend
+ bptr = nil
+ dr := AffinityRouter{
+ name: config.Name,
+ grpcService: rconf.ProtoService,
+ affinity: make(map[string]*backend),
+ methodMap: make(map[string]byte),
+ nbBindingMethodMap: make(map[string]byte),
+ currentBackend: &bptr,
+ }
+ // An association must exist
+ dr.association = config.Association
+ if dr.association == AssociationUndefined {
+ log.Error("An association must be specified")
+ rtrn_err = true
+ }
+
+ // Build the routing structure based on the loaded protobuf
+ // descriptor file and the config information.
+ type key struct {
+ method string
+ field string
+ }
+ var fieldNumberLookup = make(map[key]byte)
+ for _, f := range rconf.protoDescriptor.File {
+ // Build a temporary map of message types by name.
+ for _, m := range f.MessageType {
+ for _, fld := range m.Field {
+ log.Debugf("Processing message '%s', field '%s'", *m.Name, *fld.Name)
+ fieldNumberLookup[key{*m.Name, *fld.Name}] = byte(*fld.Number)
+ }
+ }
+ }
+ for _, f := range rconf.protoDescriptor.File {
+ if *f.Package == rconf.ProtoPackage {
+ for _, s := range f.Service {
+ if *s.Name == rconf.ProtoService {
+ log.Debugf("Loading package data '%s' for service '%s' for router '%s'", *f.Package, *s.Name, dr.name)
+ // Now create a map keyed by method name with the value being the
+ // field number of the route selector.
+ var ok bool
+ for _, m := range s.Method {
+ // Find the input type in the messages and extract the
+ // field number and save it for future reference.
+ log.Debugf("Processing method '%s'", *m.Name)
+ // Determine if this is a method we're supposed to be processing.
+ if needMethod(*m.Name, config) {
+ log.Debugf("Enabling method '%s'", *m.Name)
+ pkg_methd := pkg_re.FindStringSubmatch(*m.InputType)
+ if pkg_methd == nil {
+ log.Errorf("Regular expression didn't match input type '%s'", *m.InputType)
+ rtrn_err = true
+ }
+ // The input type has the package name prepended to it. Remove it.
+ //in := (*m.InputType)[len(rconf.ProtoPackage)+2:]
+ in := pkg_methd[PKG_MTHD_MTHD]
+ dr.methodMap[*m.Name], ok = fieldNumberLookup[key{in, config.RouteField}]
+ if !ok {
+ log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
+ *m.Name, config.RouteField, in)
+ rtrn_err = true
+ }
+ }
+ // The sb method is always included in the methods so we can check it here too.
+ if needNbBindingMethod(*m.Name, config) {
+ log.Debugf("Enabling southbound method '%s'", *m.Name)
+ // The output type has the package name prepended to it. Remove it.
+ out := (*m.OutputType)[len(rconf.ProtoPackage)+2:]
+ dr.nbBindingMethodMap[*m.Name], ok = fieldNumberLookup[key{out, config.RouteField}]
+ if !ok {
+ log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
+ *m.Name, config.RouteField, out)
+ rtrn_err = true
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Create the backend cluster or link to an existing one
+ ok := true
+ if dr.cluster, ok = clusters[config.backendCluster.Name]; !ok {
+ if dr.cluster, err = newBackendCluster(config.backendCluster); err != nil {
+ log.Errorf("Could not create a backend for router %s", config.Name)
+ rtrn_err = true
+ }
+ }
+
+ if rtrn_err {
+ return dr, errors.New(fmt.Sprintf("Failed to create a new router '%s'", dr.name))
+ }
+
+ return dr, nil
+}
+
+func needNbBindingMethod(mthd string, conf *RouteConfig) bool {
+ for _, m := range conf.NbBindingMethods {
+ if mthd == m {
+ return true
+ }
+ }
+ return false
+}
+
+// TODO: Used in multiple routers, should move to common file
+func needMethod(mthd string, conf *RouteConfig) bool {
+ for _, m := range conf.Methods {
+ if mthd == m {
+ return true
+ }
+ }
+ return false
+}
+
+func (ar AffinityRouter) Service() string {
+ return ar.grpcService
+}
+
+func (ar AffinityRouter) Name() string {
+ return ar.name
+}
+
+func (ar AffinityRouter) skipField(data *[]byte, idx *int) error {
+ switch (*data)[*idx] & 3 {
+ case 0: // Varint
+ // skip the field number/type
+ *idx++
+ // if the msb is set, then more bytes to follow
+ for (*data)[*idx] >= 128 {
+ *idx++
+ }
+ // the last byte doesn't have the msb set
+ *idx++
+ case 1: // 64 bit
+ *idx += 9
+ case 2: // Length delimited
+ *idx++
+ b := proto.NewBuffer((*data)[*idx:])
+ t, _ := b.DecodeVarint()
+ *idx += int(t) + 1
+ case 3: // Deprecated
+ case 4: // Deprecated
+ case 5: // 32 bit
+ *idx += 5
+ }
+ return nil
+}
+
+func (ar AffinityRouter) decodeProtoField(payload []byte, fieldId byte) (string, error) {
+ idx := 0
+ b := proto.NewBuffer([]byte{})
+ //b.DebugPrint("The Buffer", payload)
+ for { // Find the route selector field
+ log.Debugf("Decoding afinity value attributeNumber: %d from %v at index %d", fieldId, payload, idx)
+ log.Debugf("Attempting match with payload: %d, methodTable: %d", payload[idx], fieldId)
+ if payload[idx]>>3 == fieldId {
+ log.Debugf("Method match with payload: %d, methodTable: %d", payload[idx], fieldId)
+ // TODO: Consider supporting other selector types.... Way, way in the future
+ // ok, the future is now, support strings as well... ugh.
+ var selector string
+ switch payload[idx] & 3 {
+ case 0: // Integer
+ b.SetBuf(payload[idx+1:])
+ v, e := b.DecodeVarint()
+ if e == nil {
+ log.Debugf("Decoded the ing field: %v", v)
+ selector = strconv.Itoa(int(v))
+ } else {
+ log.Errorf("Failed to decode varint %v", e)
+ return "", e
+ }
+ case 2: // Length delimited AKA string
+ b.SetBuf(payload[idx+1:])
+ v, e := b.DecodeStringBytes()
+ if e == nil {
+ log.Debugf("Decoded the string field: %v", v)
+ selector = v
+ } else {
+ log.Errorf("Failed to decode string %v", e)
+ return "", e
+ }
+ default:
+ err := errors.New(fmt.Sprintf("Only integer and string route selectors are permitted"))
+ log.Error(err)
+ return "", err
+ }
+ return selector, nil
+ } else if err := ar.skipField(&payload, &idx); err != nil {
+ log.Errorf("Parsing message failed %v", err)
+ return "", err
+ }
+ }
+}
+
+func (ar AffinityRouter) Route(sel interface{}) (*backend, *connection) {
+ switch sl := sel.(type) {
+ case *requestFrame:
+ log.Debugf("Route called for requestFrame with method %s", sl.methodInfo.method)
+ // Check if this method should be affinity bound from the
+ // reply rather than the request.
+ if _, ok := ar.nbBindingMethodMap[sl.methodInfo.method]; ok {
+ var err error
+ log.Debugf("Method '%s' affinity binds on reply", sl.methodInfo.method)
+ // Just round robin route the southbound request
+ if *ar.currentBackend, err = ar.cluster.nextBackend(*ar.currentBackend, BackendSequenceRoundRobin); err == nil {
+ return *ar.currentBackend, nil
+ } else {
+ sl.err = err
+ return nil, nil
+ }
+ }
+ // Not a south affinity binding method, proceed with north affinity binding.
+ if selector, err := ar.decodeProtoField(sl.payload, ar.methodMap[sl.methodInfo.method]); err == nil {
+ log.Debugf("Establishing affinity for selector: %s", selector)
+ if rtrn, ok := ar.affinity[selector]; ok {
+ return rtrn, nil
+ } else {
+ // The selector isn't in the map, create a new affinity mapping
+ log.Debugf("MUST CREATE A NEW AFFINITY MAP ENTRY!!")
+ var err error
+ if *ar.currentBackend, err = ar.cluster.nextBackend(*ar.currentBackend, BackendSequenceRoundRobin); err == nil {
+ ar.setAffinity(selector, *ar.currentBackend)
+ //ar.affinity[selector] = *ar.currentBackend
+ //log.Debugf("New affinity set to backend %s",(*ar.currentBackend).name)
+ return *ar.currentBackend, nil
+ } else {
+ sl.err = err
+ return nil, nil
+ }
+ }
+ }
+ default:
+ log.Errorf("Internal: invalid data type in Route call %v", sel)
+ return nil, nil
+ }
+ log.Errorf("Bad lookup in affinity map %v", ar.affinity)
+ return nil, nil
+}
+
+func (ar AffinityRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
+ return "", "", nil
+}
+
+func (ar AffinityRouter) IsStreaming(_ string) (bool, bool) {
+ panic("not implemented")
+}
+
+func (ar AffinityRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
+ return ar.cluster, nil
+}
+
+func (ar AffinityRouter) FindBackendCluster(beName string) *cluster {
+ if beName == ar.cluster.name {
+ return ar.cluster
+ }
+ return nil
+}
+
+func (ar AffinityRouter) ReplyHandler(sel interface{}) error {
+ switch sl := sel.(type) {
+ case *responseFrame:
+ log.Debugf("Reply handler called for responseFrame with method %s", sl.method)
+ // Determine if reply action is required.
+ if fld, ok := ar.nbBindingMethodMap[sl.method]; ok && len(sl.payload) > 0 {
+ // Extract the field value from the frame and
+ // and set affinity accordingly
+ if selector, err := ar.decodeProtoField(sl.payload, fld); err == nil {
+ log.Debug("Settign affinity on reply")
+ if ar.setAffinity(selector, sl.backend) != nil {
+ log.Error("Setting affinity on reply failed")
+ }
+ return nil
+ } else {
+ err := errors.New(fmt.Sprintf("Failed to decode reply field %d for method %s", fld, sl.method))
+ log.Error(err)
+ return err
+ }
+ }
+ return nil
+ default:
+ err := errors.New(fmt.Sprintf("Internal: invalid data type in ReplyHander call %v", sl))
+ log.Error(err)
+ return err
+ }
+}
+
+func (ar AffinityRouter) setAffinity(key string, be *backend) error {
+ if be2, ok := ar.affinity[key]; !ok {
+ ar.affinity[key] = be
+ log.Debugf("New affinity set to backend %s for key %s", be.name, key)
+ } else if be2 != be {
+ err := errors.New(fmt.Sprintf("Attempting multiple sets of affinity for key %s to backend %s from %s on router %s",
+ key, be.name, ar.affinity[key].name, ar.name))
+ log.Error(err)
+ return err
+ }
+ return nil
+}
diff --git a/internal/pkg/afrouter/affinity-router_test.go b/internal/pkg/afrouter/affinity-router_test.go
new file mode 100644
index 0000000..dfcefc0
--- /dev/null
+++ b/internal/pkg/afrouter/affinity-router_test.go
@@ -0,0 +1,444 @@
+/*
+ * Portions copyright 2019-present Open Networking Foundation
+ * Original copyright 2019-present Ciena Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the"github.com/stretchr/testify/assert" "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package afrouter
+
+import (
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ common_pb "github.com/opencord/voltha-protos/go/common"
+ voltha_pb "github.com/opencord/voltha-protos/go/voltha"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc"
+ "testing"
+)
+
+const (
+ AFFINITY_ROUTER_PROTOFILE = "../../../vendor/github.com/opencord/voltha-protos/go/voltha.pb"
+)
+
+// Unit test initialization
+func init() {
+ // Logger must be configured or bad things happen
+ log.SetDefaultLogger(log.JSON, log.DebugLevel, nil)
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+// Build an affinity router configuration
+func MakeAffinityTestConfig(numBackends int, numConnections int) (*RouteConfig, *RouterConfig) {
+
+ var backends []BackendConfig
+ for backendIndex := 0; backendIndex < numBackends; backendIndex++ {
+ var connections []ConnectionConfig
+ for connectionIndex := 0; connectionIndex < numConnections; connectionIndex++ {
+ connectionConfig := ConnectionConfig{
+ Name: fmt.Sprintf("rw_vcore%d%d", backendIndex, connectionIndex+1),
+ Addr: "foo",
+ Port: "123",
+ }
+ connections = append(connections, connectionConfig)
+ }
+
+ backendConfig := BackendConfig{
+ Name: fmt.Sprintf("rw_vcore%d", backendIndex),
+ Type: BackendSingleServer,
+ Connections: connections,
+ }
+
+ backends = append(backends, backendConfig)
+ }
+
+ backendClusterConfig := BackendClusterConfig{
+ Name: "vcore",
+ Backends: backends,
+ }
+
+ routeConfig := RouteConfig{
+ Name: "dev_manager",
+ Type: RouteTypeRpcAffinityMessage,
+ Association: AssociationRoundRobin,
+ BackendCluster: "vcore",
+ backendCluster: &backendClusterConfig,
+ RouteField: "id",
+ Methods: []string{"CreateDevice", "EnableDevice"},
+ NbBindingMethods: []string{"CreateDevice"},
+ }
+
+ routerConfig := RouterConfig{
+ Name: "vcore",
+ ProtoService: "VolthaService",
+ ProtoPackage: "voltha",
+ Routes: []RouteConfig{routeConfig},
+ ProtoFile: AFFINITY_ROUTER_PROTOFILE,
+ }
+ return &routeConfig, &routerConfig
+}
+
+// Route() requires an open connection, so pretend we have one.
+func PretendAffinityOpenConnection(router Router, clusterName string, backendIndex int, connectionName string) {
+ cluster := router.FindBackendCluster(clusterName)
+
+ // Route Method expects an open connection
+ conn := cluster.backends[backendIndex].connections[connectionName]
+ cluster.backends[backendIndex].openConns[conn] = &grpc.ClientConn{}
+}
+
+// Common setup to run before each unit test
+func AffinityTestSetup() {
+ // reset globals that need to be clean for each unit test
+
+ clusters = make(map[string]*cluster)
+ allRouters = make(map[string]Router)
+}
+
+// Test creation of a new AffinityRouter, and the Service(), Name(), FindBackendCluster(), and
+// methods.
+func TestAffinityRouterInit(t *testing.T) {
+ AffinityTestSetup()
+
+ routeConfig, routerConfig := MakeAffinityTestConfig(1, 1)
+
+ router, err := newAffinityRouter(routerConfig, routeConfig)
+
+ assert.NotNil(t, router)
+ assert.Nil(t, err)
+
+ assert.Equal(t, router.Service(), "VolthaService")
+ assert.Equal(t, router.Name(), "dev_manager")
+
+ cluster, err := router.BackendCluster("foo", "bar")
+ assert.Equal(t, cluster, clusters["vcore"])
+ assert.Nil(t, err)
+
+ assert.Equal(t, router.FindBackendCluster("vcore"), clusters["vcore"])
+}
+
+// Should throw error if no name in configuration
+func TestAffinityRouterInitNoName(t *testing.T) {
+ AffinityTestSetup()
+
+ routeConfig, routerConfig := MakeAffinityTestConfig(1, 1)
+ routeConfig.Name = ""
+
+ _, err := newAffinityRouter(routerConfig, routeConfig)
+
+ assert.EqualError(t, err, "Failed to create a new router ''")
+}
+
+// Should thow error if now ProtoPackage in configuration
+func TestAffinityRouterInitNoProtoPackage(t *testing.T) {
+ AffinityTestSetup()
+
+ routeConfig, routerConfig := MakeAffinityTestConfig(1, 1)
+ routerConfig.ProtoPackage = ""
+
+ _, err := newAffinityRouter(routerConfig, routeConfig)
+
+ assert.EqualError(t, err, "Failed to create a new router 'dev_manager'")
+}
+
+// Should throw error if no ProtoServer in configuration
+func TestAffinityRouterInitNoProtoService(t *testing.T) {
+ AffinityTestSetup()
+
+ routeConfig, routerConfig := MakeAffinityTestConfig(1, 1)
+ routerConfig.ProtoService = ""
+
+ _, err := newAffinityRouter(routerConfig, routeConfig)
+
+ assert.EqualError(t, err, "Failed to create a new router 'dev_manager'")
+}
+
+// Tests a cluster with only one Backend
+func TestAffinityRouteOne(t *testing.T) {
+ AffinityTestSetup()
+
+ _, routerConfig := MakeAffinityTestConfig(1, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ PretendAffinityOpenConnection(router, "vcore", 0, "rw_vcore01")
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ metaKey: NoMeta,
+ methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}
+
+ backend, connection := router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "rw_vcore0", backend.name)
+ assert.Nil(t, connection)
+
+ // Since we only have one backend, calling Route a second time should return the same one
+
+ backend, connection = router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "rw_vcore0", backend.name)
+ assert.Nil(t, connection)
+}
+
+// Tests a cluster with two Backends
+func TestAffinityRouteTwo(t *testing.T) {
+ AffinityTestSetup()
+
+ _, routerConfig := MakeAffinityTestConfig(2, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ PretendAffinityOpenConnection(router, "vcore", 0, "rw_vcore01")
+ PretendAffinityOpenConnection(router, "vcore", 1, "rw_vcore11")
+
+ idMessage := &common_pb.ID{Id: "1234"}
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ metaKey: NoMeta,
+ methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}
+
+ // We should Route to the first core and bind affinity to it
+
+ backend, connection := router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "rw_vcore0", backend.name)
+ assert.Nil(t, connection)
+
+ // We should have established affinity, and trying Route again should return the same core
+
+ backend, connection = router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "rw_vcore0", backend.name)
+ assert.Nil(t, connection)
+
+ // Make up a message with a different id
+ idMessage = &common_pb.ID{Id: "1235"}
+ idData, err = proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel = &requestFrame{payload: idData,
+ err: nil,
+ metaKey: NoMeta,
+ methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}
+
+ // Calling Route with the new ID should cause it to bind affinity to the second core
+
+ backend, connection = router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "rw_vcore1", backend.name)
+ assert.Nil(t, connection)
+}
+
+// Tests a cluster with one backend but no open connections
+func TestAffinityRouteOneNoOpenConnection(t *testing.T) {
+ AffinityTestSetup()
+
+ _, routerConfig := MakeAffinityTestConfig(1, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ metaKey: NoMeta,
+ methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}
+
+ backend, connection := router.Route(sel)
+
+ assert.EqualError(t, sel.err, "No backend with open connections found")
+ assert.Nil(t, backend)
+ assert.Nil(t, connection)
+}
+
+// Tests binding on reply
+func TestAffinityRouteReply(t *testing.T) {
+ AffinityTestSetup()
+
+ _, routerConfig := MakeAffinityTestConfig(2, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ // Get the created AffinityRouter so we can inspect its state
+ aRouter := allRouters["vcoredev_manager"].(AffinityRouter)
+
+ PretendAffinityOpenConnection(router, "vcore", 0, "rw_vcore01")
+ PretendAffinityOpenConnection(router, "vcore", 1, "rw_vcore11")
+
+ idMessage := &voltha_pb.Device{Id: "1234"}
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ // Note that sel.backend must be set. As this is a response, it must
+ // have come from a backend and that backend must be known.
+
+ sel := &responseFrame{payload: idData,
+ metaKey: NoMeta,
+ backend: router.FindBackendCluster("vcore").backends[0],
+ method: "CreateDevice",
+ }
+
+ // affinity should be unset as we have not routed yet
+ assert.Empty(t, aRouter.affinity)
+
+ err = aRouter.ReplyHandler(sel)
+ assert.Nil(t, err)
+
+ // affinity should now be set
+ assert.NotEmpty(t, aRouter.affinity)
+ assert.Equal(t, router.FindBackendCluster("vcore").backends[0], aRouter.affinity["1234"])
+}
+
+// Tests binding on reply, with incorrect frame type
+func TestAffinityRouteReplyIncorrectFrame(t *testing.T) {
+ AffinityTestSetup()
+
+ _, routerConfig := MakeAffinityTestConfig(2, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ // Get the created AffinityRouter so we can inspect its state
+ aRouter := allRouters["vcoredev_manager"].(AffinityRouter)
+
+ PretendAffinityOpenConnection(router, "vcore", 0, "rw_vcore01")
+ PretendAffinityOpenConnection(router, "vcore", 1, "rw_vcore11")
+
+ idMessage := &voltha_pb.Device{Id: "1234"}
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ metaKey: NoMeta,
+ methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice"),
+ }
+
+ // ReplyHandler expects a replyFrame and we're giving it a requestFrame instead
+
+ err = aRouter.ReplyHandler(sel)
+ assert.EqualError(t, err, "Internal: invalid data type in ReplyHander call &{[10 4 49 50 51 52] <nil> <nil> <nil> <nil> {/voltha.VolthaService/EnableDevice voltha VolthaService EnableDevice} nometa }")
+}
+
+func TestAffinityRouterDecodeProtoField(t *testing.T) {
+ AffinityTestSetup()
+
+ _, routerConfig := MakeAffinityTestConfig(2, 1)
+
+ _, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ // Get the created AffinityRouter so we can inspect its state
+ aRouter := allRouters["vcoredev_manager"].(AffinityRouter)
+
+ // Pick something to test with lots of field types. Port is a good candidate.
+ portMessage := &voltha_pb.Port{PortNo: 123,
+ Label: "testlabel",
+ Type: 3,
+ DeviceId: "5678",
+ RxPackets: 9876,
+ }
+
+ portData, err := proto.Marshal(portMessage)
+ assert.Nil(t, err)
+
+ /*
+ * Decode various fields in the protobuf. Decoding each subsequent
+ * field implies skipfield() is called on its predecessor.
+ */
+
+ s, err := aRouter.decodeProtoField(portData, 1) // field 1 is PortNo
+ assert.Equal(t, "123", s)
+
+ // Test VOL-1882, skipping of varint field. Note: May cause infinite loop if not fixed!
+ s, err = aRouter.decodeProtoField(portData, 2) // field 2 is Label
+ assert.Equal(t, "testlabel", s)
+
+ s, err = aRouter.decodeProtoField(portData, 3) // field 3 is PortType
+ assert.Equal(t, "3", s)
+
+ s, err = aRouter.decodeProtoField(portData, 7) // field 7 is DeviceId
+ assert.Equal(t, "5678", s)
+
+ // TODO: Seems like an int64 ought to be allowed...
+ s, err = aRouter.decodeProtoField(portData, 9) // field 7 is RxPackets
+ assert.EqualError(t, err, "Only integer and string route selectors are permitted")
+}
+
+// Test setting affinity for a key to a backend
+func TestAffinitySetAffinity(t *testing.T) {
+ AffinityTestSetup()
+
+ _, routerConfig := MakeAffinityTestConfig(2, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ // Get the created AffinityRouter so we can inspect its state
+ aRouter := allRouters["vcoredev_manager"].(AffinityRouter)
+
+ backend := router.FindBackendCluster("vcore").backends[0]
+ err = aRouter.setAffinity("1234", backend)
+
+ assert.Nil(t, err)
+}
+
+// Trying to set affinity when it has already been set should fail.
+func TestAffinitySetAffinityChange(t *testing.T) {
+ AffinityTestSetup()
+
+ _, routerConfig := MakeAffinityTestConfig(2, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ // Get the created AffinityRouter so we can inspect its state
+ aRouter := allRouters["vcoredev_manager"].(AffinityRouter)
+
+ backend := router.FindBackendCluster("vcore").backends[0]
+ err = aRouter.setAffinity("1234", backend)
+
+ assert.Nil(t, err)
+
+ // Now pick a different backend
+ backend = router.FindBackendCluster("vcore").backends[1]
+ err = aRouter.setAffinity("1234", backend)
+
+ assert.EqualError(t, err, "Attempting multiple sets of affinity for key 1234 to backend rw_vcore1 from rw_vcore0 on router dev_manager")
+}
diff --git a/internal/pkg/afrouter/api.go b/internal/pkg/afrouter/api.go
new file mode 100644
index 0000000..8b72897
--- /dev/null
+++ b/internal/pkg/afrouter/api.go
@@ -0,0 +1,265 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ pb "github.com/opencord/voltha-protos/go/afrouter"
+ common_pb "github.com/opencord/voltha-protos/go/common"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "net"
+ "net/url"
+ "runtime"
+ "strconv"
+)
+
+type ArouterApi struct {
+ addr string
+ port int
+ apiListener net.Listener
+ apiServer *grpc.Server
+ running bool
+ ar *ArouterProxy
+}
+
+func newApi(config *ApiConfig, ar *ArouterProxy) (*ArouterApi, error) {
+ var rtrn_err bool
+ // Create a seperate server and listener for the API
+ // Validate the ip address if one is provided
+ if _, err := url.Parse(config.Addr); err != nil {
+ log.Errorf("Invalid address '%s' provided for API server", config.Addr)
+ rtrn_err = true
+ }
+ if rtrn_err {
+ return nil, errors.New("Errors in API configuration")
+ } else {
+ var err error = nil
+ aa := &ArouterApi{addr: config.Addr, port: int(config.Port), ar: ar}
+ // Create the listener for the API server
+ if aa.apiListener, err =
+ net.Listen("tcp", config.Addr+":"+
+ strconv.Itoa(int(config.Port))); err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ // Create the API server
+ aa.apiServer = grpc.NewServer()
+ pb.RegisterConfigurationServer(aa.apiServer, *aa)
+ return aa, err
+ }
+}
+
+func (aa *ArouterApi) getServer(srvr string) (*server, error) {
+ if s, ok := aa.ar.servers[srvr]; !ok {
+ err := errors.New(fmt.Sprintf("Server '%s' doesn't exist", srvr))
+ return nil, err
+ } else {
+ return s, nil
+ }
+}
+
+func (aa *ArouterApi) getRouter(s *server, clstr string) (Router, error) {
+ for _, pkg := range s.routers {
+ for _, r := range pkg {
+ if c := r.FindBackendCluster(clstr); c != nil {
+ return r, nil
+ }
+ }
+ }
+ err := errors.New(fmt.Sprintf("Cluster '%s' doesn't exist", clstr))
+ return nil, err
+}
+
+func (aa *ArouterApi) getCluster(s *server, clstr string) (*cluster, error) {
+ for _, pkg := range s.routers {
+ for _, r := range pkg {
+ if c := r.FindBackendCluster(clstr); c != nil {
+ return c, nil
+ }
+ }
+ }
+ err := errors.New(fmt.Sprintf("Cluster '%s' doesn't exist", clstr))
+ return nil, err
+}
+
+func (aa *ArouterApi) getBackend(c *cluster, bknd string) (*backend, error) {
+ for _, b := range c.backends {
+ if b.name == bknd {
+ return b, nil
+ }
+ }
+ err := errors.New(fmt.Sprintf("Backend '%s' doesn't exist in cluster %s",
+ bknd, c.name))
+ return nil, err
+}
+
+func (aa *ArouterApi) getConnection(b *backend, con string) (*connection, error) {
+ if c, ok := b.connections[con]; !ok {
+ err := errors.New(fmt.Sprintf("Connection '%s' doesn't exist", con))
+ return nil, err
+ } else {
+ return c, nil
+ }
+}
+
+func (aa *ArouterApi) updateConnection(in *pb.Conn, cn *connection, b *backend) error {
+ return errors.New("updateConnection not implemented")
+}
+
+func (aa ArouterApi) SetAffinity(ctx context.Context, in *pb.Affinity) (*pb.Result, error) {
+ log.Debugf("SetAffinity called! %v", in)
+ //return &pb.Result{Success:true,Error:""},nil
+ // Navigate down tot he connection and compare IP addresses and ports if they're
+ // not the same then close the existing connection. If they are bothe the same
+ // then return an error describing the situation.
+ var err error
+
+ aap := &aa
+
+ _ = aap
+
+ log.Debugf("Getting router %s and route %s", in.Router, in.Route)
+ if r, ok := allRouters[in.Router+in.Route]; ok {
+ switch rr := r.(type) {
+ case AffinityRouter:
+ log.Debug("Affinity router found")
+ b := rr.FindBackendCluster(in.Cluster).getBackend(in.Backend)
+ if b != nil {
+ rr.setAffinity(in.Id, b)
+ } else {
+ log.Errorf("Requested backend '%s' not found", in.Backend)
+ }
+ _ = rr
+ case MethodRouter:
+ log.Debug("Method router found")
+ _ = rr
+ default:
+ log.Debug("Some other router found")
+ _ = rr
+ }
+ } else {
+ log.Debugf("Couldn't get router type")
+ return &pb.Result{Success: false, Error: err.Error()}, err
+ }
+
+ return &pb.Result{Success: true, Error: ""}, nil
+}
+
+func (aa ArouterApi) SetConnection(ctx context.Context, in *pb.Conn) (*pb.Result, error) {
+ // Navigate down tot he connection and compare IP addresses and ports if they're
+ // not the same then close the existing connection. If they are bothe the same
+ // then return an error describing the situation.
+ var s *server
+ var c *cluster
+ var b *backend
+ var cn *connection
+ var err error
+
+ log.Debugf("SetConnection called! %v", in)
+
+ aap := &aa
+ if s, err = (aap).getServer(in.Server); err != nil {
+ err := errors.New(fmt.Sprintf("Server '%s' doesn't exist", in.Server))
+ log.Error(err)
+ return &pb.Result{Success: false, Error: err.Error()}, err
+ }
+ // The cluster is usually accessed via tha router but since each
+ // cluster is unique it's good enough to find the router that
+ // has the cluster we're looking for rather than fully keying
+ // the path
+ if c, err = aap.getCluster(s, in.Cluster); err != nil {
+ log.Error(err)
+ return &pb.Result{Success: false, Error: err.Error()}, err
+ }
+
+ if b, err = aap.getBackend(c, in.Backend); err != nil {
+ log.Error(err)
+ return &pb.Result{Success: false, Error: err.Error()}, err
+ }
+
+ if cn, err = aap.getConnection(b, in.Connection); err != nil {
+ log.Error(err)
+ return &pb.Result{Success: false, Error: err.Error()}, err
+ }
+
+ if err = aap.updateConnection(in, cn, b); err != nil {
+ log.Error(err)
+ return &pb.Result{Success: false, Error: err.Error()}, err
+ }
+
+ return &pb.Result{Success: true, Error: ""}, nil
+}
+
+func (aa ArouterApi) GetGoroutineCount(ctx context.Context, in *pb.Empty) (*pb.Count, error) {
+ return &pb.Count{Count: uint32(runtime.NumGoroutine())}, nil
+}
+
+func (aa ArouterApi) UpdateLogLevel(ctx context.Context, in *common_pb.Logging) (*pb.Empty, error) {
+ intLevel := int(in.Level)
+
+ if in.PackageName == "" {
+ log.SetAllLogLevel(intLevel)
+ log.SetDefaultLogLevel(intLevel)
+ } else if in.PackageName == "default" {
+ log.SetDefaultLogLevel(intLevel)
+ } else {
+ log.SetPackageLogLevel(in.PackageName, intLevel)
+ }
+
+ return &pb.Empty{}, nil
+}
+
+func (aa ArouterApi) GetLogLevels(ctx context.Context, in *common_pb.LoggingComponent) (*common_pb.Loggings, error) {
+ logLevels := &common_pb.Loggings{}
+
+ // do the per-package log levels
+ for _, packageName := range log.GetPackageNames() {
+ level, err := log.GetPackageLogLevel(packageName)
+ if err != nil {
+ return nil, err
+ }
+ logLevel := &common_pb.Logging{
+ ComponentName: in.ComponentName,
+ PackageName: packageName,
+ Level: common_pb.LogLevel_LogLevel(level)}
+ logLevels.Items = append(logLevels.Items, logLevel)
+ }
+
+ // now do the default log level
+ logLevel := &common_pb.Logging{
+ ComponentName: in.ComponentName,
+ PackageName: "default",
+ Level: common_pb.LogLevel_LogLevel(log.GetDefaultLogLevel())}
+ logLevels.Items = append(logLevels.Items, logLevel)
+
+ return logLevels, nil
+}
+
+func (aa *ArouterApi) serve() {
+ // Start a serving thread
+ go func() {
+ aa.running = true
+ if err := aa.apiServer.Serve(aa.apiListener); err != nil {
+ aa.running = false
+ log.Error(err)
+ errChan <- err
+ }
+ }()
+}
diff --git a/internal/pkg/afrouter/arproxy.go b/internal/pkg/afrouter/arproxy.go
new file mode 100644
index 0000000..72641de
--- /dev/null
+++ b/internal/pkg/afrouter/arproxy.go
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+// This file implements the ArouterPoxy struct and its
+// functions. The ArouterProxy is the top level object
+// for the affinity router.
+
+import (
+ "github.com/opencord/voltha-go/common/log"
+)
+
+// String names for display in error messages.
+var arProxy *ArouterProxy = nil
+
+type ArouterProxy struct {
+ servers map[string]*server // Defined in handler.go
+ api *ArouterApi
+}
+
+// Create the routing proxy
+func NewArouterProxy(conf *Configuration) (*ArouterProxy, error) {
+ arProxy = &ArouterProxy{servers: make(map[string]*server)}
+ // Create all the servers listed in the configuration
+ for _, s := range conf.Servers {
+ if ns, err := newServer(&s); err != nil {
+ log.Error("Configuration failed")
+ return nil, err
+ } else {
+ arProxy.servers[ns.Name()] = ns
+ }
+ }
+
+ // TODO: The API is not mandatory, check if it's even in the config before
+ // trying to create it. If it isn't then don't bother but log a warning.
+ if api, err := newApi(&conf.Api, arProxy); err != nil {
+ return nil, err
+ } else {
+ arProxy.api = api
+ }
+
+ return arProxy, nil
+}
+
+// Start serving
+func (ap *ArouterProxy) ListenAndServe() error {
+
+ for _, srvr := range ap.servers {
+ ap.serve(srvr)
+ }
+ ap.api.serve()
+
+ // Just wait until we're done which only happens
+ // on a signal or an error.
+ err := <-doneChan
+
+ return err
+}
+
+func (ap *ArouterProxy) serve(srvr *server) {
+
+ // Start a serving thread
+ go func() {
+ srvr.running = true
+ if err := srvr.proxyServer.Serve(srvr.proxyListener); err != nil {
+ srvr.running = false
+ log.Error(err)
+ errChan <- err
+ }
+ }()
+}
diff --git a/internal/pkg/afrouter/backend.go b/internal/pkg/afrouter/backend.go
new file mode 100644
index 0000000..c597d49
--- /dev/null
+++ b/internal/pkg/afrouter/backend.go
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+// Backend manager handles redundant connections per backend
+
+import (
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "net/url"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+// backend represents a collection of backends in a HA configuration
+type backend struct {
+ mutex sync.Mutex
+ name string
+ beType backendType
+ activeAssociation association
+ connFailCallback func(string, *backend) bool
+ connections map[string]*connection
+ openConns map[*connection]*grpc.ClientConn
+ activeRequests map[*request]struct{}
+}
+
+type association struct {
+ strategy associationStrategy
+ location associationLocation
+ field string // Used only if location is protobuf
+ key string
+}
+
+// splitActiveStreamsUnsafe expects the caller to have already locked the backend mutex
+func (be *backend) splitActiveStreamsUnsafe(cn *connection, conn *grpc.ClientConn) {
+ if len(be.activeRequests) != 0 {
+ log.Debugf("Creating new streams for %d existing requests", len(be.activeRequests))
+ }
+ for r := range be.activeRequests {
+ r.mutex.Lock()
+ if _, have := r.streams[cn.name]; !have {
+ log.Debugf("Opening southbound stream for existing request '%s'", r.methodInfo.method)
+ if stream, err := grpc.NewClientStream(r.ctx, clientStreamDescForProxying, conn, r.methodInfo.all); err != nil {
+ log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
+ } else {
+ go r.catchupRequestStreamThenForwardResponseStream(cn.name, stream)
+ // new thread will unlock the request mutex
+ continue
+ }
+ }
+ r.mutex.Unlock()
+ }
+}
+
+// openSouthboundStreams sets up a connection to each southbound frame
+func (be *backend) openSouthboundStreams(srv interface{}, serverStream grpc.ServerStream, nf *requestFrame, sf *responseFrame) (*request, error) {
+ be.mutex.Lock()
+ defer be.mutex.Unlock()
+
+ isStreamingRequest, isStreamingResponse := nf.router.IsStreaming(nf.methodInfo.method)
+
+ // Get the metadata from the incoming message on the server
+ md, ok := metadata.FromIncomingContext(serverStream.Context())
+ if !ok {
+ return nil, errors.New("could not get a server stream metadata")
+ }
+
+ r := &request{
+ // Create an outgoing context that includes the incoming metadata and that will cancel if the server's context is canceled
+ ctx: metadata.AppendToOutgoingContext(metadata.NewOutgoingContext(serverStream.Context(), md.Copy()), "voltha_serial_number", nf.serialNo),
+
+ streams: make(map[string]grpc.ClientStream),
+ responseErrChan: make(chan error, 1),
+
+ backend: be,
+ serverStream: serverStream,
+ methodInfo: nf.methodInfo,
+ requestFrame: nf,
+ responseFrame: sf,
+ isStreamingRequest: isStreamingRequest,
+ isStreamingResponse: isStreamingResponse,
+ }
+
+ log.Debugf("Opening southbound request for method '%s'", nf.methodInfo.method)
+
+ // TODO: Need to check if this is an active/active backend cluster
+ // with a serial number in the header.
+ log.Debugf("Serial number for transaction allocated: %s", nf.serialNo)
+ // If even one stream can be created then proceed. If none can be
+ // created then report an error because both the primary and redundant
+ // connections are non-existent.
+ var atLeastOne = false
+ var errStr strings.Builder
+
+ log.Debugf("There are %d/%d streams to open", len(be.openConns), len(be.connections))
+ if nf.connection != nil {
+ // Debug statement triggered by source router. Other routers have no connection preference.
+ log.Debugf("Looking for connection %s", nf.connection.name)
+ }
+ for cn, conn := range be.openConns {
+ // If source-router was used, it will indicate a specific connection to be used
+ if nf.connection != nil && nf.connection != cn {
+ continue
+ }
+
+ log.Debugf("Opening stream for connection '%s'", cn.name)
+ if stream, err := grpc.NewClientStream(r.ctx, clientStreamDescForProxying, conn, r.methodInfo.all); err != nil {
+ log.Debugf("Failed to create a client stream '%s', %v", cn.name, err)
+ } else {
+ r.streams[cn.name] = stream
+ go r.forwardResponseStream(cn.name, stream)
+ atLeastOne = true
+ }
+ }
+ if atLeastOne {
+ be.activeRequests[r] = struct{}{}
+ return r, nil
+ }
+ fmt.Fprintf(&errStr, "{{No open connections for backend '%s' unable to send}} ", be.name)
+ log.Error(errStr.String())
+ return nil, errors.New(errStr.String())
+}
+
+func (be *backend) handler(srv interface{}, serverStream grpc.ServerStream, nf *requestFrame, sf *responseFrame) error {
+ // Set up streams for each open connection
+ request, err := be.openSouthboundStreams(srv, serverStream, nf, sf)
+ if err != nil {
+ log.Errorf("openStreams failed: %v", err)
+ return err
+ }
+
+ log.Debug("Starting request stream forwarding")
+ if s2cErr := request.forwardRequestStream(serverStream); s2cErr != nil {
+ // exit with an error to the stack
+ return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
+ }
+ // wait for response stream to complete
+ return <-request.responseErrChan
+}
+
+func newBackend(conf *BackendConfig, clusterName string) (*backend, error) {
+ var rtrn_err bool = false
+
+ log.Debugf("Configuring the backend with %v", *conf)
+ // Validate the conifg and configure the backend
+ be := &backend{
+ name: conf.Name,
+ connections: make(map[string]*connection),
+ openConns: make(map[*connection]*grpc.ClientConn),
+ activeRequests: make(map[*request]struct{}),
+ }
+ if conf.Type == BackendUndefined {
+ log.Error("Invalid type specified for backend %s in cluster %s", conf.Name, clusterName)
+ rtrn_err = true
+ }
+ be.beType = conf.Type
+
+ if conf.Association.Strategy == AssociationStrategyUndefined && be.beType == BackendActiveActive {
+ log.Errorf("An association strategy must be provided if the backend "+
+ "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
+ rtrn_err = true
+ }
+ be.activeAssociation.strategy = conf.Association.Strategy
+
+ if conf.Association.Location == AssociationLocationUndefined && be.beType == BackendActiveActive {
+ log.Errorf("An association location must be provided if the backend "+
+ "type is active/active for backend %s in cluster %s", conf.Name, clusterName)
+ rtrn_err = true
+ }
+ be.activeAssociation.location = conf.Association.Location
+
+ if conf.Association.Field == "" && be.activeAssociation.location == AssociationLocationProtobuf {
+ log.Errorf("An association field must be provided if the backend "+
+ "type is active/active and the location is set to protobuf "+
+ "for backend %s in cluster %s", conf.Name, clusterName)
+ rtrn_err = true
+ }
+ be.activeAssociation.field = conf.Association.Field
+
+ if conf.Association.Key == "" && be.activeAssociation.location == AssociationLocationHeader {
+ log.Errorf("An association key must be provided if the backend "+
+ "type is active/active and the location is set to header "+
+ "for backend %s in cluster %s", conf.Name, clusterName)
+ rtrn_err = true
+ }
+ be.activeAssociation.key = conf.Association.Key
+ if rtrn_err {
+ return nil, errors.New("Backend configuration failed")
+ }
+ // Configure the connections
+ // Connections can consist of just a name. This allows for dynamic configuration
+ // at a later time.
+ // TODO: validate that there is one connection for all but active/active backends
+ if len(conf.Connections) > 1 && be.beType != BackendActiveActive {
+ log.Errorf("Only one connection must be specified if the association " +
+ "strategy is not set to 'active_active'")
+ rtrn_err = true
+ }
+ if len(conf.Connections) == 0 {
+ log.Errorf("At least one connection must be specified")
+ rtrn_err = true
+ }
+ for _, cnConf := range conf.Connections {
+ if cnConf.Name == "" {
+ log.Errorf("A connection must have a name for backend %s in cluster %s",
+ conf.Name, clusterName)
+ } else {
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ be.connections[cnConf.Name] = &connection{name: cnConf.Name, addr: cnConf.Addr, port: cnConf.Port, backend: be, ctx: ctx, close: cancelFunc}
+ if _, err := url.Parse(cnConf.Addr); err != nil {
+ log.Errorf("The address for connection %s in backend %s in cluster %s is invalid: %s",
+ cnConf.Name, conf.Name, clusterName, err)
+ rtrn_err = true
+ }
+ // Validate the port number. This just validtes that it's a non 0 integer
+ if n, err := strconv.Atoi(cnConf.Port); err != nil || n <= 0 || n > 65535 {
+ log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
+ cnConf.Port, cnConf.Name, conf.Name, clusterName)
+ rtrn_err = true
+ } else {
+ if n <= 0 && n > 65535 {
+ log.Errorf("Port %s for connection %s in backend %s in cluster %s is invalid",
+ cnConf.Port, cnConf.Name, conf.Name, clusterName)
+ rtrn_err = true
+ }
+ }
+ }
+ }
+
+ if rtrn_err {
+ return nil, errors.New("Connection configuration failed")
+ }
+ // All is well start the backend cluster connections
+ be.connectAll()
+
+ return be, nil
+}
+
+func (be *backend) incConn(cn *connection, conn *grpc.ClientConn) {
+ be.mutex.Lock()
+ defer be.mutex.Unlock()
+
+ be.openConns[cn] = conn
+ be.splitActiveStreamsUnsafe(cn, conn)
+}
+
+func (be *backend) decConn(cn *connection) {
+ be.mutex.Lock()
+ defer be.mutex.Unlock()
+
+ delete(be.openConns, cn)
+}
+
+func (be *backend) NumOpenConnections() int {
+ be.mutex.Lock()
+ defer be.mutex.Unlock()
+
+ return len(be.openConns)
+}
+
+// Attempts to establish all the connections for a backend
+// any failures result in an abort. This should only be called
+// on a first attempt to connect. Individual connections should be
+// handled after that.
+func (be *backend) connectAll() {
+ for _, cn := range be.connections {
+ go cn.connect()
+ }
+}
+
+// Set a callback for connection failure notification
+// This is currently not used.
+func (be *backend) setConnFailCallback(cb func(string, *backend) bool) {
+ be.connFailCallback = cb
+}
diff --git a/internal/pkg/afrouter/binding-router.go b/internal/pkg/afrouter/binding-router.go
new file mode 100644
index 0000000..2602137
--- /dev/null
+++ b/internal/pkg/afrouter/binding-router.go
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+type BindingRouter struct {
+ name string
+ association associationType
+ //routingField string
+ grpcService string
+ //protoDescriptor *pb.FileDescriptorSet
+ //methodMap map[string]byte
+ beCluster *cluster
+ bindings map[string]*backend
+ bindingType string
+ bindingField string
+ bindingMethod string
+ currentBackend **backend
+}
+
+func (br BindingRouter) IsStreaming(_ string) (bool, bool) {
+ panic("not implemented")
+}
+
+func (br BindingRouter) BackendCluster(s string, metaKey string) (*cluster, error) {
+ return br.beCluster, nil
+ //return nil,errors.New("Not implemented yet")
+}
+func (br BindingRouter) Name() string {
+ return br.name
+}
+func (br BindingRouter) Service() string {
+ return br.grpcService
+}
+func (br BindingRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
+ var rtrnK = ""
+ var rtrnV = ""
+
+ // Get the metadata from the server stream
+ md, ok := metadata.FromIncomingContext(serverStream.Context())
+ if !ok {
+ return rtrnK, rtrnV, errors.New("Could not get a server stream metadata")
+ }
+
+ // Determine if one of the method routing keys exists in the metadata
+ if _, ok := md[br.bindingField]; ok == true {
+ rtrnV = md[br.bindingField][0]
+ rtrnK = br.bindingField
+ }
+
+ return rtrnK, rtrnV, nil
+}
+func (br BindingRouter) FindBackendCluster(becName string) *cluster {
+ if becName == br.beCluster.name {
+ return br.beCluster
+ }
+ return nil
+}
+func (br BindingRouter) ReplyHandler(v interface{}) error {
+ return nil
+}
+func (br BindingRouter) Route(sel interface{}) (*backend, *connection) {
+ var err error
+ switch sl := sel.(type) {
+ case *requestFrame:
+ if b, ok := br.bindings[sl.metaVal]; ok == true { // binding exists, just return it
+ return b, nil
+ } else { // establish a new binding or error.
+ if sl.metaVal != "" {
+ err = errors.New(fmt.Sprintf("Attempt to route on non-existent metadata value '%s' in key '%s'",
+ sl.metaVal, sl.metaKey))
+ log.Error(err)
+ sl.err = err
+ return nil, nil
+ }
+ if sl.methodInfo.method != br.bindingMethod {
+ err = errors.New(fmt.Sprintf("Binding must occur with method %s but attempted with method %s",
+ br.bindingMethod, sl.methodInfo.method))
+ log.Error(err)
+ sl.err = err
+ return nil, nil
+ }
+ log.Debugf("MUST CREATE A NEW BINDING MAP ENTRY!!")
+ if *br.currentBackend, err = br.beCluster.nextBackend(*br.currentBackend, BackendSequenceRoundRobin); err == nil {
+ // Use the name of the backend as the metaVal for this new binding
+ br.bindings[(*br.currentBackend).name] = *br.currentBackend
+ return *br.currentBackend, nil
+
+ } else {
+ log.Error(err)
+ sl.err = err
+ return nil, nil
+ }
+ }
+ default:
+ return nil, nil
+ }
+}
+
+func newBindingRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+ var rtrn_err = false
+ var err error = nil
+ log.Debugf("Creating binding router %s", config.Name)
+ // A name must exist
+ if config.Name == "" {
+ log.Error("A router 'name' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoPackage == "" {
+ log.Error("A 'package' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoService == "" {
+ log.Error("A 'service' must be specified")
+ rtrn_err = true
+ }
+
+ //if config.RouteField == "" {
+ // log.Error("A 'routing_field' must be specified")
+ // rtrn_err = true
+ //}
+
+ // TODO: Using the specified service, the imported proto
+ // descriptor file should be scanned for all methods provided
+ // for this router to ensure that this field exists in
+ // the message(s) passed to the method. This will avoid run
+ // time failures that might not be detected for long periods
+ // of time.
+
+ // TODO The routes section is currently not being used
+ // so the router will route all methods based on the
+ // routing_field. This needs to be done.
+ var bptr *backend
+ bptr = nil
+ br := BindingRouter{
+ name: config.Name,
+ grpcService: rconf.ProtoService,
+ bindings: make(map[string]*backend),
+ //methodMap:make(map[string]byte),
+ currentBackend: &bptr,
+ }
+
+ // A binding association must exist
+ br.association = config.Binding.Association
+ if br.association == AssociationUndefined {
+ log.Error("An binding association must be specified")
+ rtrn_err = true
+ }
+ // A binding type must exist
+ // TODO: This is parsed but ignored and a header based type is used.
+ if config.Binding.Type != "header" {
+ log.Error("The binding type must be set to header")
+ rtrn_err = true
+ } else {
+ br.bindingType = config.Binding.Type
+ }
+ // A binding method must exist
+ if config.Binding.Method == "" {
+ log.Error("The binding method must be specified")
+ rtrn_err = true
+ } else {
+ br.bindingMethod = config.Binding.Method
+ }
+ // A binding field must exxist
+ if config.Binding.Field == "" {
+ log.Error("The binding field must be specified")
+ rtrn_err = true
+ } else {
+ br.bindingField = config.Binding.Field
+ }
+
+ // Create the backend cluster or link to an existing one
+ ok := true
+ if br.beCluster, ok = clusters[config.backendCluster.Name]; ok == false {
+ if br.beCluster, err = newBackendCluster(config.backendCluster); err != nil {
+ log.Errorf("Could not create a backend for router %s", config.Name)
+ rtrn_err = true
+ }
+ }
+
+ // HERE HERE HERE
+
+ if rtrn_err {
+ return br, errors.New(fmt.Sprintf("Failed to create a new router '%s'", br.name))
+ }
+
+ return br, nil
+}
diff --git a/internal/pkg/afrouter/cluster.go b/internal/pkg/afrouter/cluster.go
new file mode 100644
index 0000000..859d92f
--- /dev/null
+++ b/internal/pkg/afrouter/cluster.go
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "errors"
+ "fmt"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+)
+
+var clusters = make(map[string]*cluster)
+
+// cluster a collection of HA backends
+type cluster struct {
+ name string
+ //backends map[string]*backend
+ backends []*backend
+ backendIDMap map[*backend]int
+}
+
+//TODO: Move the backend type (active/active etc) to the cluster
+// level. All backends should really be of the same type.
+// Create a new backend cluster
+func newBackendCluster(conf *BackendClusterConfig) (*cluster, error) {
+ var err error = nil
+ var rtrn_err = false
+ var be *backend
+ log.Debugf("Creating a backend cluster with %v", conf)
+ // Validate the configuration
+ if conf.Name == "" {
+ log.Error("A backend cluster must have a name")
+ rtrn_err = true
+ }
+ //bc := &cluster{name:conf.Name,backends:make(map[string]*backend)}
+ bc := &cluster{name: conf.Name, backendIDMap: make(map[*backend]int)}
+ clusters[bc.name] = bc
+ idx := 0
+ for _, bec := range conf.Backends {
+ if bec.Name == "" {
+ log.Errorf("A backend must have a name in cluster %s\n", conf.Name)
+ rtrn_err = true
+ }
+ if be, err = newBackend(&bec, conf.Name); err != nil {
+ log.Errorf("Error creating backend %s", bec.Name)
+ rtrn_err = true
+ }
+ bc.backends = append(bc.backends, be)
+ bc.backendIDMap[bc.backends[idx]] = idx
+ idx++
+ }
+ if rtrn_err {
+ return nil, errors.New("Error creating backend(s)")
+ }
+ return bc, nil
+}
+
+func (c *cluster) getBackend(name string) *backend {
+ for _, v := range c.backends {
+ if v.name == name {
+ return v
+ }
+ }
+ return nil
+}
+
+func (c *cluster) allocateSerialNumber() string {
+ return uuid.New().String()
+}
+
+func (c *cluster) nextBackend(be *backend, seq backendSequence) (*backend, error) {
+ switch seq {
+ case BackendSequenceRoundRobin: // Round robin
+ in := be
+ // If no backend is found having a connection
+ // then return nil.
+ if be == nil {
+ log.Debug("Previous backend is nil")
+ be = c.backends[0]
+ in = be
+ if be.NumOpenConnections() != 0 {
+ return be, nil
+ }
+ }
+ for {
+ log.Debugf("Requesting a new backend starting from %s", be.name)
+ cur := c.backendIDMap[be]
+ cur++
+ if cur >= len(c.backends) {
+ cur = 0
+ }
+ log.Debugf("Next backend is %d:%s", cur, c.backends[cur].name)
+ if c.backends[cur].NumOpenConnections() > 0 {
+ return c.backends[cur], nil
+ }
+ if c.backends[cur] == in {
+ err := fmt.Errorf("No backend with open connections found")
+ log.Debug(err)
+ return nil, err
+ }
+ be = c.backends[cur]
+ log.Debugf("Backend '%s' has no open connections, trying next", c.backends[cur].name)
+ }
+ default: // Invalid, default to round robin
+ log.Errorf("Invalid backend sequence %d. Defaulting to round robin", seq)
+ return c.nextBackend(be, BackendSequenceRoundRobin)
+ }
+}
+
+func (c *cluster) handler(srv interface{}, serverStream grpc.ServerStream, r Router, methodInfo methodDetails,
+ mk string, mv string) error {
+ //func (c *cluster) handler(nbR * nbRequest) error {
+
+ // The final backend cluster needs to be determined here. With non-affinity routed backends it could
+ // just be determined here and for affinity routed backends the first message must be received
+ // before the backend is determined. In order to keep things simple, the same approach is taken for
+ // now.
+
+ // Get the backend to use.
+ // Allocate the requestFrame here since it holds the "context" of this communication
+ nf := &requestFrame{router: r, methodInfo: methodInfo, serialNo: c.allocateSerialNumber(), metaKey: mk, metaVal: mv}
+ log.Debugf("Nb frame allocate with method %s", nf.methodInfo.method)
+
+ if be, err := c.assignBackend(serverStream, nf); err != nil {
+ // At this point, no backend streams have been initiated
+ // so just return the error.
+ return err
+ } else {
+ log.Debugf("Backend '%s' selected", be.name)
+ // Allocate a responseFrame here because it might be needed for return value intercept
+ sf := &responseFrame{router: r, backend: be, method: nf.methodInfo.method, metaKey: mk, metaVal: mv}
+ log.Debugf("Sb frame allocated with router %s", r.Name())
+ return be.handler(srv, serverStream, nf, sf)
+ }
+}
+
+func (c *cluster) assignBackend(src grpc.ServerStream, f *requestFrame) (*backend, error) {
+ // Receive the first message from the server. This calls the assigned codec in which
+ // Unmarshal gets executed. That will use the assigned router to select a backend
+ // and add it to the frame
+ if err := src.RecvMsg(f); err != nil {
+ return nil, err
+ }
+ // Check that the backend was routable and actually has connections open.
+ // If it doesn't then return a nil backend to indicate this
+ if f.backend == nil {
+ err := fmt.Errorf("Unable to route method '%s'", f.methodInfo.method)
+ log.Error(err)
+ return nil, err
+ } else if len(f.backend.openConns) == 0 {
+ err := fmt.Errorf("No open connections on backend '%s'", f.backend.name)
+ log.Error(err)
+ return f.backend, err
+ }
+ log.Debugf("Assigned backend %s", f.backend.name)
+ return f.backend, nil
+}
diff --git a/internal/pkg/afrouter/codec.go b/internal/pkg/afrouter/codec.go
new file mode 100644
index 0000000..278fc0a
--- /dev/null
+++ b/internal/pkg/afrouter/codec.go
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+)
+
+func Codec() grpc.Codec {
+ return CodecWithParent(&protoCodec{})
+}
+
+func CodecWithParent(parent grpc.Codec) grpc.Codec {
+ return &transparentRoutingCodec{parent}
+}
+
+type transparentRoutingCodec struct {
+ parentCodec grpc.Codec
+}
+
+// responseFrame is a frame being "returned" to whomever established the connection
+type responseFrame struct {
+ payload []byte
+ router Router
+ method string
+ backend *backend
+ metaKey string
+ metaVal string
+}
+
+// requestFrame is a frame coming in from whomever established the connection
+type requestFrame struct {
+ payload []byte
+ router Router
+ backend *backend
+ connection *connection // optional, if the router preferred one connection over another
+ err error
+ methodInfo methodDetails
+ serialNo string
+ metaKey string
+ metaVal string
+}
+
+func (cdc *transparentRoutingCodec) Marshal(v interface{}) ([]byte, error) {
+ switch t := v.(type) {
+ case *responseFrame:
+ return t.payload, nil
+ case *requestFrame:
+ return t.payload, nil
+ default:
+ return cdc.parentCodec.Marshal(v)
+ }
+}
+
+func (cdc *transparentRoutingCodec) Unmarshal(data []byte, v interface{}) error {
+ switch t := v.(type) {
+ case *responseFrame:
+ t.payload = data
+ // This is where the affinity is established on a northbound response
+ t.router.ReplyHandler(v)
+ return nil
+ case *requestFrame:
+ t.payload = data
+ // This is were the afinity value is pulled from the payload
+ // and the backend selected.
+ t.backend, t.connection = t.router.Route(v)
+ name := "<nil>"
+ if t.backend != nil {
+ name = t.backend.name
+ }
+ log.Debugf("Routing returned %s for method %s", name, t.methodInfo.method)
+
+ return nil
+ default:
+ return cdc.parentCodec.Unmarshal(data, v)
+ }
+}
+
+func (cdc *transparentRoutingCodec) String() string {
+ return fmt.Sprintf("%s", cdc.parentCodec.String())
+}
+
+// protoCodec is a Codec implementation with protobuf. It is the default Codec for gRPC.
+type protoCodec struct{}
+
+func (protoCodec) Marshal(v interface{}) ([]byte, error) {
+ return proto.Marshal(v.(proto.Message))
+}
+
+func (protoCodec) Unmarshal(data []byte, v interface{}) error {
+ return proto.Unmarshal(data, v.(proto.Message))
+}
+
+func (protoCodec) String() string {
+ return "protoCodec"
+}
diff --git a/internal/pkg/afrouter/config.go b/internal/pkg/afrouter/config.go
new file mode 100644
index 0000000..6008199
--- /dev/null
+++ b/internal/pkg/afrouter/config.go
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+// Command line parameters and parsing
+import (
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "github.com/golang/protobuf/protoc-gen-go/descriptor"
+ "github.com/opencord/voltha-go/common/log"
+ "io/ioutil"
+ "os"
+ "path"
+)
+
+func ParseCmd() (*Configuration, error) {
+ config := &Configuration{}
+ cmdParse := flag.NewFlagSet(path.Base(os.Args[0]), flag.ContinueOnError)
+ config.ConfigFile = cmdParse.String("config", "arouter.json", "The configuration file for the affinity router")
+ config.LogLevel = cmdParse.Int("logLevel", 0, "The log level for the affinity router")
+ config.GrpcLog = cmdParse.Bool("grpclog", false, "Enable GRPC logging")
+ config.DisplayVersionOnly = cmdParse.Bool("version", false, "Print version information and exit")
+
+ err := cmdParse.Parse(os.Args[1:])
+ if err != nil {
+ //return err
+ return nil, errors.New("Error parsing the command line")
+ }
+ //if(!cmdParse.Parsed()) {
+ //}
+ return config, nil
+}
+
+// Configuration file loading and parsing
+type Configuration struct {
+ ConfigFile *string
+ LogLevel *int
+ GrpcLog *bool
+ DisplayVersionOnly *bool
+ Servers []ServerConfig `json:"servers"`
+ Ports PortConfig `json:"ports"`
+ ServerCertificates ServerCertConfig `json:"serverCertificates"`
+ ClientCertificates ClientCertConfig `json:"clientCertificates"`
+ BackendClusters []BackendClusterConfig `json:"backend_clusters"`
+ Routers []RouterConfig `json:"routers"`
+ Api ApiConfig
+}
+
+type RouterConfig struct {
+ Name string `json:"name"`
+ ProtoService string `json:"service"`
+ ProtoPackage string `json:"package"`
+ ProtoFile string `json:"proto_descriptor"`
+ Routes []RouteConfig `json:"routes"`
+ protoDescriptor descriptor.FileDescriptorSet
+}
+
+type RouteConfig struct {
+ Name string `json:"name"`
+ Type routeType `json:"type"`
+ Association associationType `json:"association"`
+ RouteField string `json:"routing_field"`
+ Methods []string `json:"methods"` // The GRPC methods to route using the route field
+ NbBindingMethods []string `json:"nb_binding_methods"`
+ BackendCluster string `json:"backend_cluster"`
+ Binding BindingConfig `json:"binding"`
+ Overrides []OverrideConfig `json:"overrides"`
+ backendCluster *BackendClusterConfig
+}
+
+type BindingConfig struct {
+ Type string `json:"type"`
+ Field string `json:"field"`
+ Method string `json:"method"`
+ Association associationType `json:"association"`
+}
+
+type OverrideConfig struct {
+ Methods []string `json:"methods"`
+ Method string `json:"method"`
+ RouteField string `json:"routing_field"`
+}
+
+// Backend configuration
+
+type BackendClusterConfig struct {
+ Name string `json:"name"`
+ Backends []BackendConfig `json:"backends"`
+}
+
+type BackendConfig struct {
+ Name string `json:"name"`
+ Type backendType `json:"type"`
+ Association AssociationConfig `json:"association"`
+ Connections []ConnectionConfig `json:"connections"`
+}
+
+type AssociationConfig struct {
+ Strategy associationStrategy `json:"strategy"`
+ Location associationLocation `json:"location"`
+ Field string `json:"field"`
+ Key string `json:"key"`
+}
+
+type ConnectionConfig struct {
+ Name string `json:"name"`
+ Addr string `json:"addr"`
+ Port string `json:"port"`
+}
+
+// Server configuration
+
+type ServerConfig struct {
+ Name string `json:"name"`
+ Port uint `json:"port"`
+ Addr string `json:"address"`
+ Type string `json:"type"`
+ Routers []RouterPackage `json:"routers"`
+ routers map[string]*RouterConfig
+}
+
+type RouterPackage struct {
+ Router string `json:"router"`
+ Package string `json:"package"`
+}
+
+// Port configuration
+type PortConfig struct {
+ GrpcPort uint `json:"grpcPort"`
+ StreamingGrpcPort uint `json:"streamingGrpcPort"`
+ TlsGrpcPort uint `json:"tlsGrpcPort"`
+ TlsStreamingGrpcPort uint `json:"tlsStreamingGrpcPort"`
+ ControlPort uint `json:"controlPort"`
+}
+
+// Server Certificate configuration
+type ServerCertConfig struct {
+ GrpcCert string `json:"grpcCertificate"` // File path to the certificate file
+ GrpcKey string `json:"grpcKey"` // File path to the key file
+ GrpcCsr string `json:"grpcCsr"` // File path to the CSR file
+}
+
+// Client Certificate configuration
+type ClientCertConfig struct {
+ GrpcCert string `json:"grpcCertificate"` // File path to the certificate file
+ GrpcKey string `json:"grpcKey"` // File path to the key file
+ GrpcCsr string `json:"grpcCsr"` // File path to the CSR file
+}
+
+// Api configuration
+type ApiConfig struct {
+ Addr string `json:"address"`
+ Port uint `json:"port"`
+}
+
+func (conf *Configuration) LoadConfig() error {
+
+ configF, err := os.Open(*conf.ConfigFile)
+ log.Info("Loading configuration from: ", *conf.ConfigFile)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+
+ defer configF.Close()
+
+ configBytes, err := ioutil.ReadAll(configF)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+
+ if err := json.Unmarshal(configBytes, conf); err != nil {
+ log.Errorf("Unmarshaling of the configuratino file failed: %v", err)
+ return err
+ }
+
+ // Now resolve references to different config objects in the
+ // config file. Currently there are 2 possible references
+ // to resolve: referecnes to routers in the servers, and
+ // references to backend_cluster in the routers.
+
+ // Resolve router references for the servers
+ log.Debug("Resolving references in the config file")
+ for k := range conf.Servers {
+ //s.routers =make(map[string]*RouterConfig)
+ conf.Servers[k].routers = make(map[string]*RouterConfig)
+ for _, rPkg := range conf.Servers[k].Routers {
+ var found = false
+ // Locate the router "r" in the top lever Routers array
+ log.Debugf("Resolving router reference to router '%s' from server '%s'", rPkg.Router, conf.Servers[k].Name)
+ for rk := range conf.Routers {
+ if conf.Routers[rk].Name == rPkg.Router && !found {
+ log.Debugf("Reference to router '%s' found for package '%s'", rPkg.Router, rPkg.Package)
+ conf.Servers[k].routers[rPkg.Package] = &conf.Routers[rk]
+ found = true
+ } else if conf.Routers[rk].Name == rPkg.Router && found {
+ if _, ok := conf.Servers[k].routers[rPkg.Package]; !ok {
+ log.Debugf("Reference to router '%s' found for package '%s'", rPkg.Router, rPkg.Package)
+ conf.Servers[k].routers[rPkg.Package] = &conf.Routers[rk]
+ } else {
+ err := errors.New(fmt.Sprintf("Duplicate router '%s' defined for package '%s'", rPkg.Router, rPkg.Package))
+ log.Error(err)
+ return err
+ }
+ }
+ }
+ if !found {
+ err := errors.New(fmt.Sprintf("Router %s for server %s not found in config", conf.Servers[k].Name, rPkg.Router))
+ log.Error(err)
+ return err
+ }
+ }
+ }
+
+ // Resolve backend references for the routers
+ for rk, rv := range conf.Routers {
+ for rtk, rtv := range rv.Routes {
+ var found = false
+ log.Debugf("Resolving backend reference to %s from router %s", rtv.BackendCluster, rv.Name)
+ for bek, bev := range conf.BackendClusters {
+ log.Debugf("Checking cluster %s", conf.BackendClusters[bek].Name)
+ if rtv.BackendCluster == bev.Name && !found {
+ conf.Routers[rk].Routes[rtk].backendCluster = &conf.BackendClusters[bek]
+ found = true
+ } else if rtv.BackendCluster == bev.Name && found {
+ err := errors.New(fmt.Sprintf("Duplicate backend defined, %s", conf.BackendClusters[bek].Name))
+ log.Error(err)
+ return err
+ }
+ }
+ if !found {
+ err := errors.New(fmt.Sprintf("Backend %s for router %s not found in config",
+ rtv.BackendCluster, rv.Name))
+ log.Error(err)
+ return err
+ }
+ }
+ }
+
+ return nil
+}
diff --git a/internal/pkg/afrouter/connection.go b/internal/pkg/afrouter/connection.go
new file mode 100644
index 0000000..dcdb8d6
--- /dev/null
+++ b/internal/pkg/afrouter/connection.go
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "context"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/connectivity"
+ "time"
+)
+
+// connection represents a connection to a single backend
+type connection struct {
+ backend *backend
+ name string
+ addr string
+ port string
+ ctx context.Context
+ close context.CancelFunc
+}
+
+func (cn *connection) connect() {
+ for {
+ log.Infof("Connecting to %s with addr: %s and port %s", cn.name, cn.addr, cn.port)
+ // Dial doesn't block, it just returns and continues connecting in the background.
+ // Check back later to confirm and increase the connection count.
+
+ var err error
+ conn, err := grpc.Dial(cn.addr+":"+cn.port, grpc.WithCodec(Codec()), grpc.WithInsecure(), grpc.WithBackoffMaxDelay(time.Second*15))
+ if err != nil {
+ log.Fatalf("Dialing connection %v:%v", cn, err)
+ }
+
+ log.Debugf("Starting the connection monitor for '%s'", cn.name)
+ cn.monitor(conn)
+ conn.Close()
+
+ select {
+ case <-cn.ctx.Done():
+ return
+ default:
+ }
+ }
+}
+
+func (cn *connection) monitor(conn *grpc.ClientConn) {
+ be := cn.backend
+ log.Debugf("Setting up monitoring for backend %s", be.name)
+ state := connectivity.Idle
+monitorLoop:
+ for {
+ if !conn.WaitForStateChange(cn.ctx, state) {
+ log.Debugf("Context canceled for connection '%s' on backend '%s'", cn.name, be.name)
+ break monitorLoop // connection closed
+ }
+
+ if newState := conn.GetState(); newState != state {
+ previousState := state
+ state = newState
+
+ if previousState == connectivity.Ready {
+ be.decConn(cn)
+ log.Infof("Lost connection '%s' on backend '%s'", cn.name, be.name)
+ }
+
+ switch state {
+ case connectivity.Ready:
+ log.Infof("Connection '%s' on backend '%s' becomes ready", cn.name, be.name)
+ be.incConn(cn, conn)
+ case connectivity.TransientFailure, connectivity.Connecting:
+ // we don't log these, to avoid spam
+ case connectivity.Shutdown:
+ // the connection was closed
+ log.Infof("Shutdown for connection '%s' on backend '%s'", cn.name, be.name)
+ break monitorLoop
+ case connectivity.Idle:
+ // This can only happen if the server sends a GoAway. This can
+ // only happen if the server has modified MaxConnectionIdle from
+ // its default of infinity. The only solution here is to close the
+ // connection and keepTrying()?
+ //TODO: Read the grpc source code to see if there's a different approach
+ log.Errorf("Server sent 'GoAway' on connection '%s' on backend '%s'", cn.name, be.name)
+ break monitorLoop
+ }
+ }
+ }
+}
diff --git a/internal/pkg/afrouter/enums.go b/internal/pkg/afrouter/enums.go
new file mode 100644
index 0000000..1847191
--- /dev/null
+++ b/internal/pkg/afrouter/enums.go
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "encoding/json"
+ "fmt"
+)
+
+type backendType int
+
+const (
+ BackendUndefined backendType = iota
+ BackendActiveActive
+ BackendSingleServer
+)
+
+var stringToBeType = map[string]backendType{"active_active": BackendActiveActive, "server": BackendSingleServer}
+var beTypeToString = map[backendType]string{BackendActiveActive: "active_active", BackendSingleServer: "server"}
+
+func (t backendType) MarshalJSON() ([]byte, error) {
+ if t == BackendUndefined {
+ return json.Marshal(nil)
+ }
+ if str, have := beTypeToString[t]; have {
+ return json.Marshal(str)
+ }
+ return nil, fmt.Errorf("unknown %T '%d'", t, t)
+}
+
+func (t *backendType) UnmarshalJSON(b []byte) error {
+ var str string
+ if err := json.Unmarshal(b, &str); err != nil {
+ return err
+ }
+ var have bool
+ if *t, have = stringToBeType[str]; !have {
+ return fmt.Errorf("invalid %T %s", *t, str)
+ }
+ return nil
+}
+
+type associationLocation int
+
+const (
+ AssociationLocationUndefined associationLocation = iota
+ AssociationLocationHeader
+ AssociationLocationProtobuf
+)
+
+var stringToAlType = map[string]associationLocation{"header": AssociationLocationHeader, "protobuf": AssociationLocationProtobuf}
+var alTypeToString = map[associationLocation]string{AssociationLocationHeader: "header", AssociationLocationProtobuf: "protobuf"}
+
+func (t associationLocation) MarshalJSON() ([]byte, error) {
+ if t == AssociationLocationUndefined {
+ return json.Marshal(nil)
+ }
+ if str, have := alTypeToString[t]; have {
+ return json.Marshal(str)
+ }
+ return nil, fmt.Errorf("unknown %T '%d'", t, t)
+}
+
+func (t *associationLocation) UnmarshalJSON(b []byte) error {
+ var str string
+ if err := json.Unmarshal(b, &str); err != nil {
+ return err
+ }
+ var have bool
+ if *t, have = stringToAlType[str]; !have {
+ return fmt.Errorf("invalid %T %s", *t, str)
+ }
+ return nil
+}
+
+type associationStrategy int
+
+const (
+ AssociationStrategyUndefined associationStrategy = iota
+ AssociationStrategySerialNo
+)
+
+var stringToAsType = map[string]associationStrategy{"serial_number": AssociationStrategySerialNo}
+var asTypeToString = map[associationStrategy]string{AssociationStrategySerialNo: "serial_number"}
+
+func (t associationStrategy) MarshalJSON() ([]byte, error) {
+ if t == AssociationStrategyUndefined {
+ return json.Marshal(nil)
+ }
+ if str, have := asTypeToString[t]; have {
+ return json.Marshal(str)
+ }
+ return nil, fmt.Errorf("unknown %T '%d'", t, t)
+}
+
+func (t *associationStrategy) UnmarshalJSON(b []byte) error {
+ var str string
+ if err := json.Unmarshal(b, &str); err != nil {
+ return err
+ }
+ var have bool
+ if *t, have = stringToAsType[str]; !have {
+ return fmt.Errorf("invalid %T %s", *t, str)
+ }
+ return nil
+}
+
+type backendSequence int
+
+const (
+ BackendSequenceRoundRobin backendSequence = iota
+)
+
+type routeType int
+
+const (
+ RouteTypeUndefined routeType = iota
+ RouteTypeRpcAffinityMessage
+ RouteTypeRpcAffinityHeader
+ RouteTypeBinding
+ RouteTypeRoundRobin
+ RouteTypeSource
+)
+
+// String names for display in error messages.
+var stringToRouteType = map[string]routeType{"rpc_affinity_message": RouteTypeRpcAffinityMessage, "rpc_affinity_header": RouteTypeRpcAffinityHeader, "binding": RouteTypeBinding, "round_robin": RouteTypeRoundRobin, "source": RouteTypeSource}
+var routeTypeToString = map[routeType]string{RouteTypeRpcAffinityMessage: "rpc_affinity_message", RouteTypeRpcAffinityHeader: "rpc_affinity_header", RouteTypeBinding: "binding", RouteTypeRoundRobin: "round_robin", RouteTypeSource: "source"}
+
+func (t routeType) String() string {
+ if str, have := routeTypeToString[t]; have {
+ return str
+ }
+ return fmt.Sprintf("%T(%d)", t, t)
+}
+
+func (t routeType) MarshalJSON() ([]byte, error) {
+ if t == RouteTypeUndefined {
+ return json.Marshal(nil)
+ }
+ if str, have := routeTypeToString[t]; have {
+ return json.Marshal(str)
+ }
+ return nil, fmt.Errorf("unknown %T '%d'", t, t)
+}
+
+func (t *routeType) UnmarshalJSON(b []byte) error {
+ var str string
+ if err := json.Unmarshal(b, &str); err != nil {
+ return err
+ }
+ var have bool
+ if *t, have = stringToRouteType[str]; !have {
+ return fmt.Errorf("invalid %T %s", *t, str)
+ }
+ return nil
+}
+
+type associationType int
+
+const (
+ AssociationUndefined associationType = iota
+ AssociationRoundRobin
+)
+
+var stringToAssociationType = map[string]associationType{"round_robin": AssociationRoundRobin}
+var associationTypeToString = map[associationType]string{AssociationRoundRobin: "round_robin"}
+
+func (t associationType) MarshalJSON() ([]byte, error) {
+ if t == AssociationUndefined {
+ return json.Marshal(nil)
+ }
+ if str, have := associationTypeToString[t]; have {
+ return json.Marshal(str)
+ }
+ return nil, fmt.Errorf("unknown %T '%d'", t, t)
+}
+
+func (t *associationType) UnmarshalJSON(b []byte) error {
+ var str string
+ if err := json.Unmarshal(b, &str); err != nil {
+ return err
+ }
+ var have bool
+ if *t, have = stringToAssociationType[str]; !have {
+ return fmt.Errorf("invalid %T %s", *t, str)
+ }
+ return nil
+}
diff --git a/internal/pkg/afrouter/method-details.go b/internal/pkg/afrouter/method-details.go
new file mode 100644
index 0000000..ad05121
--- /dev/null
+++ b/internal/pkg/afrouter/method-details.go
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "github.com/opencord/voltha-go/common/log"
+ "regexp"
+)
+
+type methodDetails struct {
+ all string
+ pkg string
+ service string
+ method string
+}
+
+// The compiled regex to extract the package/service/method
+var mthdSlicer = regexp.MustCompile(`^/([a-zA-Z][a-zA-Z0-9]+)\.([a-zA-Z][a-zA-Z0-9]+)/([a-zA-Z][a-zA-Z0-9]+)`)
+
+func newMethodDetails(fullMethodName string) methodDetails {
+ // The full method name is structured as follows:
+ // <package name>.<service>/<method>
+ mthdSlice := mthdSlicer.FindStringSubmatch(fullMethodName)
+ if mthdSlice == nil {
+ log.Errorf("Faled to slice full method %s, result: %v", fullMethodName, mthdSlice)
+ } else {
+ log.Debugf("Sliced full method %s: %v", fullMethodName, mthdSlice)
+ }
+ return methodDetails{
+ all: mthdSlice[0],
+ pkg: mthdSlice[1],
+ service: mthdSlice[2],
+ method: mthdSlice[3],
+ }
+}
diff --git a/internal/pkg/afrouter/method-router.go b/internal/pkg/afrouter/method-router.go
new file mode 100644
index 0000000..2916edf
--- /dev/null
+++ b/internal/pkg/afrouter/method-router.go
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "errors"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+ "io/ioutil"
+)
+
+const NoMeta = "nometa"
+
+type MethodRouter struct {
+ name string
+ service string
+ methodRouter map[string]map[string]Router // map of [metadata][method]
+ methodStreaming map[string]streamingDirections
+}
+
+type streamingDirections struct {
+ request bool
+ response bool
+}
+
+func newMethodRouter(config *RouterConfig) (Router, error) {
+ // Load the protobuf descriptor file
+ fb, err := ioutil.ReadFile(config.ProtoFile)
+ if err != nil {
+ log.Errorf("Could not open proto file '%s'", config.ProtoFile)
+ return nil, err
+ }
+ if err := proto.Unmarshal(fb, &config.protoDescriptor); err != nil {
+ log.Errorf("Could not unmarshal %s, %v", "proto.pb", err)
+ return nil, err
+ }
+
+ mr := MethodRouter{
+ name: config.Name,
+ service: config.ProtoService,
+ methodRouter: map[string]map[string]Router{
+ NoMeta: make(map[string]Router), // For routes not needing metadata (all except binding at this time)
+ },
+ methodStreaming: make(map[string]streamingDirections),
+ }
+ log.Debugf("Processing MethodRouter config %v", *config)
+
+ for _, file := range config.protoDescriptor.File {
+ if *file.Package == config.ProtoPackage {
+ for _, service := range file.Service {
+ if *service.Name == config.ProtoService {
+ for _, method := range service.Method {
+ if clientStreaming, serverStreaming := method.ClientStreaming != nil && *method.ClientStreaming, method.ServerStreaming != nil && *method.ServerStreaming; clientStreaming || serverStreaming {
+ mr.methodStreaming[*method.Name] = streamingDirections{
+ request: clientStreaming,
+ response: serverStreaming,
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if len(config.Routes) == 0 {
+ return nil, errors.New(fmt.Sprintf("Router %s must have at least one route", config.Name))
+ }
+ for _, rtv := range config.Routes {
+ //log.Debugf("Processing route: %v",rtv)
+ var idx1 string
+ r, err := newSubRouter(config, &rtv)
+ if err != nil {
+ return nil, err
+ }
+ if rtv.Type == RouteTypeBinding {
+ idx1 = rtv.Binding.Field
+ if _, ok := mr.methodRouter[idx1]; !ok { // /First attempt on this key
+ mr.methodRouter[idx1] = make(map[string]Router)
+ }
+ } else {
+ idx1 = NoMeta
+ }
+ switch len(rtv.Methods) {
+ case 0:
+ return nil, errors.New(fmt.Sprintf("Route for router %s must have at least one method", config.Name))
+ case 1:
+ if rtv.Methods[0] == "*" {
+ return r, nil
+ } else {
+ log.Debugf("Setting router '%s' for single method '%s'", r.Name(), rtv.Methods[0])
+ if _, ok := mr.methodRouter[idx1][rtv.Methods[0]]; !ok {
+ mr.methodRouter[idx1][rtv.Methods[0]] = r
+ } else {
+ err := errors.New(fmt.Sprintf("Attempt to define method %s for 2 routes: %s & %s", rtv.Methods[0],
+ r.Name(), mr.methodRouter[idx1][rtv.Methods[0]].Name()))
+ log.Error(err)
+ return mr, err
+ }
+ }
+ default:
+ for _, m := range rtv.Methods {
+ log.Debugf("Processing Method %s", m)
+ if _, ok := mr.methodRouter[idx1][m]; !ok {
+ log.Debugf("Setting router '%s' for method '%s'", r.Name(), m)
+ mr.methodRouter[idx1][m] = r
+ } else {
+ err := errors.New(fmt.Sprintf("Attempt to define method %s for 2 routes: %s & %s", m, r.Name(), mr.methodRouter[idx1][m].Name()))
+ log.Error(err)
+ return mr, err
+ }
+ }
+ }
+ }
+
+ return mr, nil
+}
+
+func (mr MethodRouter) Name() string {
+ return mr.name
+}
+
+func (mr MethodRouter) Service() string {
+ return mr.service
+}
+
+func (mr MethodRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
+ var rtrnK = NoMeta
+ var rtrnV = ""
+
+ // Get the metadata from the server stream
+ md, ok := metadata.FromIncomingContext(serverStream.Context())
+ if !ok {
+ return rtrnK, rtrnV, errors.New("Could not get a server stream metadata")
+ }
+
+ // Determine if one of the method routing keys exists in the metadata
+ for k := range mr.methodRouter {
+ if _, ok := md[k]; ok {
+ rtrnV = md[k][0]
+ rtrnK = k
+ break
+ }
+ }
+ return rtrnK, rtrnV, nil
+
+}
+
+func (mr MethodRouter) ReplyHandler(sel interface{}) error {
+ switch sl := sel.(type) {
+ case *responseFrame:
+ if r, ok := mr.methodRouter[NoMeta][sl.method]; ok {
+ return r.ReplyHandler(sel)
+ }
+ return errors.New("MethodRouter.ReplyHandler called with unknown meta or method")
+ default:
+ return errors.New("MethodRouter.ReplyHandler called with non-reponseFrame")
+ }
+}
+
+func (mr MethodRouter) Route(sel interface{}) (*backend, *connection) {
+ switch sl := sel.(type) {
+ case *requestFrame:
+ if r, ok := mr.methodRouter[sl.metaKey][sl.methodInfo.method]; ok {
+ return r.Route(sel)
+ }
+ sl.err = fmt.Errorf("MethodRouter.Route unable to resolve meta %s, method %s", sl.metaKey, sl.methodInfo.method)
+ log.Error(sl.err)
+ return nil, nil
+ default:
+ log.Errorf("Internal: invalid data type in Route call %v", sel)
+ return nil, nil
+ }
+}
+
+func (mr MethodRouter) IsStreaming(method string) (bool, bool) {
+ streamingDirections := mr.methodStreaming[method]
+ return streamingDirections.request, streamingDirections.response
+}
+
+func (mr MethodRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
+ if r, ok := mr.methodRouter[metaKey][mthd]; ok {
+ return r.BackendCluster(mthd, metaKey)
+ }
+ err := errors.New(fmt.Sprintf("No backend cluster exists for method '%s' using meta key '%s'", mthd, metaKey))
+ log.Error(err)
+ return nil, err
+}
+
+func (mr MethodRouter) FindBackendCluster(beName string) *cluster {
+ for _, meta := range mr.methodRouter {
+ for _, r := range meta {
+ if rtrn := r.FindBackendCluster(beName); rtrn != nil {
+ return rtrn
+ }
+ }
+ }
+ return nil
+}
diff --git a/internal/pkg/afrouter/method-router_test.go b/internal/pkg/afrouter/method-router_test.go
new file mode 100644
index 0000000..2e72f1c
--- /dev/null
+++ b/internal/pkg/afrouter/method-router_test.go
@@ -0,0 +1,396 @@
+/*
+ * Portions copyright 2019-present Open Networking Foundation
+ * Original copyright 2019-present Ciena Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the"github.com/stretchr/testify/assert" "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package afrouter
+
+import (
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ common_pb "github.com/opencord/voltha-protos/go/common"
+ voltha_pb "github.com/opencord/voltha-protos/go/voltha"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc"
+ "testing"
+)
+
+const (
+ METHOD_ROUTER_PROTOFILE = "../../../vendor/github.com/opencord/voltha-protos/go/voltha.pb"
+)
+
+// Unit test initialization
+func init() {
+ // Logger must be configured or bad things happen
+ log.SetDefaultLogger(log.JSON, log.DebugLevel, nil)
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+// Build an method router configuration
+func MakeMethodTestConfig(numBackends int, numConnections int) (*RouteConfig, *RouterConfig) {
+
+ var backends []BackendConfig
+ for backendIndex := 0; backendIndex < numBackends; backendIndex++ {
+ var connections []ConnectionConfig
+ for connectionIndex := 0; connectionIndex < numConnections; connectionIndex++ {
+ connectionConfig := ConnectionConfig{
+ Name: fmt.Sprintf("rw_vcore%d%d", backendIndex, connectionIndex+1),
+ Addr: "foo",
+ Port: "123",
+ }
+ connections = append(connections, connectionConfig)
+ }
+
+ backendConfig := BackendConfig{
+ Name: fmt.Sprintf("rw_vcore%d", backendIndex),
+ Type: BackendSingleServer,
+ Connections: connections,
+ }
+
+ backends = append(backends, backendConfig)
+ }
+
+ backendClusterConfig := BackendClusterConfig{
+ Name: "vcore",
+ Backends: backends,
+ }
+
+ routeConfig := RouteConfig{
+ Name: "dev_manager",
+ Type: RouteTypeRpcAffinityMessage,
+ Association: AssociationRoundRobin,
+ BackendCluster: "vcore",
+ backendCluster: &backendClusterConfig,
+ RouteField: "id",
+ Methods: []string{"CreateDevice", "EnableDevice"},
+ NbBindingMethods: []string{"CreateDevice"},
+ }
+
+ routerConfig := RouterConfig{
+ Name: "vcore",
+ ProtoService: "VolthaService",
+ ProtoPackage: "voltha",
+ Routes: []RouteConfig{routeConfig},
+ ProtoFile: METHOD_ROUTER_PROTOFILE,
+ }
+ return &routeConfig, &routerConfig
+}
+
+// Route() requires an open connection, so pretend we have one.
+func PretendMethodOpenConnection(router Router, clusterName string, backendIndex int, connectionName string) {
+ cluster := router.FindBackendCluster(clusterName)
+
+ // Route Method expects an open connection
+ conn := cluster.backends[backendIndex].connections[connectionName]
+ cluster.backends[backendIndex].openConns[conn] = &grpc.ClientConn{}
+}
+
+// Common setup to run before each unit test
+func MethodTestSetup() {
+ // reset globals that need to be clean for each unit test
+
+ clusters = make(map[string]*cluster)
+ allRouters = make(map[string]Router)
+}
+
+// Test creation of a new AffinityRouter, and the Service(), Name(), FindBackendCluster(), and
+// methods.
+func TestMethodRouterInit(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newMethodRouter(routerConfig)
+
+ assert.NotNil(t, router)
+ assert.Nil(t, err)
+
+ assert.Equal(t, router.Service(), "VolthaService")
+ assert.Equal(t, router.Name(), "vcore")
+
+ cluster, err := router.BackendCluster("EnableDevice", NoMeta)
+ assert.Equal(t, cluster, clusters["vcore"])
+ assert.Nil(t, err)
+
+ assert.Equal(t, router.FindBackendCluster("vcore"), clusters["vcore"])
+}
+
+// Passing an invalid meta should return an error
+func TestMethodRouterBackendClusterInvalidMeta(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newMethodRouter(routerConfig)
+
+ assert.NotNil(t, router)
+ assert.Nil(t, err)
+
+ cluster, err := router.BackendCluster("EnableDevice", "wrongmeta")
+ assert.EqualError(t, err, "No backend cluster exists for method 'EnableDevice' using meta key 'wrongmeta'")
+ assert.Nil(t, cluster)
+}
+
+// Passing an invalid method name should return an error
+func TestMethodRouterBackendClusterInvalidMethod(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newMethodRouter(routerConfig)
+
+ assert.NotNil(t, router)
+ assert.Nil(t, err)
+
+ cluster, err := router.BackendCluster("WrongMethod", NoMeta)
+ assert.EqualError(t, err, "No backend cluster exists for method 'WrongMethod' using meta key 'nometa'")
+ assert.Nil(t, cluster)
+}
+
+// Search for a backend cluster that doesn't exist
+func TestMethodRouterFindBackendClusterNoExist(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newMethodRouter(routerConfig)
+
+ assert.NotNil(t, router)
+ assert.Nil(t, err)
+
+ assert.Nil(t, router.FindBackendCluster("wrong"))
+}
+
+// MethodRouter's route will cause another router's route, in this case AffinityRouter.
+func TestMethodRoute(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newMethodRouter(routerConfig)
+ assert.Nil(t, err)
+
+ PretendMethodOpenConnection(router, "vcore", 0, "rw_vcore01")
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ metaKey: NoMeta,
+ methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}
+
+ backend, connection := router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "rw_vcore0", backend.name)
+ assert.Nil(t, connection)
+
+ // Since we only have one backend, calling Route a second time should return the same one
+
+ backend, connection = router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "rw_vcore0", backend.name)
+ assert.Nil(t, connection)
+}
+
+// Try to route to a nonexistent method
+func TestMethodRouteNonexistent(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newMethodRouter(routerConfig)
+ assert.Nil(t, err)
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ metaKey: NoMeta,
+ methodInfo: newMethodDetails("/voltha.VolthaService/NonexistentMethod")}
+
+ backend, connection := router.Route(sel)
+
+ assert.Nil(t, backend)
+ assert.Nil(t, connection)
+
+ assert.EqualError(t, sel.err, "MethodRouter.Route unable to resolve meta nometa, method NonexistentMethod")
+}
+
+// Try to route to a nonexistent meta key
+func TestMethodRouteWrongMeta(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newMethodRouter(routerConfig)
+ assert.Nil(t, err)
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ metaKey: "wrongkey",
+ methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}
+
+ backend, connection := router.Route(sel)
+
+ assert.Nil(t, backend)
+ assert.Nil(t, connection)
+
+ assert.EqualError(t, sel.err, "MethodRouter.Route unable to resolve meta wrongkey, method EnableDevice")
+}
+
+// Try to route to a the wrong type of key
+func TestMethodRouteWrongFrame(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newMethodRouter(routerConfig)
+ assert.Nil(t, err)
+
+ idMessage := &voltha_pb.Device{Id: "1234"}
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ // Note that sel.backend must be set. As this is a response, it must
+ // have come from a backend and that backend must be known.
+
+ sel := &responseFrame{payload: idData,
+ metaKey: NoMeta,
+ backend: router.FindBackendCluster("vcore").backends[0],
+ method: "CreateDevice",
+ }
+
+ // Note: Does not return any error, but does print an error message. Returns nil.
+
+ backend, connection := router.Route(sel)
+
+ assert.Nil(t, backend)
+ assert.Nil(t, connection)
+}
+
+// MethodRouter calls another Router's ReplyHandler, in this case AffinityRouter
+func TestMethodRouteReply(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ aRouter := allRouters["vcoredev_manager"].(AffinityRouter)
+
+ PretendMethodOpenConnection(router, "vcore", 0, "rw_vcore01")
+
+ idMessage := &voltha_pb.Device{Id: "1234"}
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ // Note that sel.backend must be set. As this is a response, it must
+ // have come from a backend and that backend must be known.
+
+ sel := &responseFrame{payload: idData,
+ metaKey: NoMeta,
+ backend: router.FindBackendCluster("vcore").backends[0],
+ method: "CreateDevice",
+ }
+
+ // affinity should be unset as we have not routed yet
+ assert.Empty(t, aRouter.affinity)
+
+ err = router.ReplyHandler(sel)
+ assert.Nil(t, err)
+
+ // affinity should now be set
+ assert.NotEmpty(t, aRouter.affinity)
+ assert.Equal(t, router.FindBackendCluster("vcore").backends[0], aRouter.affinity["1234"])
+}
+
+// Call ReplyHandler with the wrong type of frame
+func TestMethodRouteReplyWrongFrame(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ PretendMethodOpenConnection(router, "vcore", 0, "rw_vcore01")
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ metaKey: "wrongkey",
+ methodInfo: newMethodDetails("/voltha.VolthaService/EnableDevice")}
+
+ err = router.ReplyHandler(sel)
+ assert.EqualError(t, err, "MethodRouter.ReplyHandler called with non-reponseFrame")
+}
+
+// Call ReplyHandler with an invalid method name
+func TestMethodRouteReplyWrongMethod(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ PretendMethodOpenConnection(router, "vcore", 0, "rw_vcore01")
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ metaKey: "wrongkey",
+ methodInfo: newMethodDetails("/voltha.VolthaService/WrongMethod")}
+
+ err = router.ReplyHandler(sel)
+ assert.EqualError(t, err, "MethodRouter.ReplyHandler called with non-reponseFrame")
+}
+
+func TestMethodIsMethodStreaming(t *testing.T) {
+ MethodTestSetup()
+
+ _, routerConfig := MakeMethodTestConfig(1, 1)
+
+ router, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ request, response := router.IsStreaming("EnableDevice")
+ assert.False(t, request)
+ assert.False(t, response)
+}
diff --git a/internal/pkg/afrouter/request.go b/internal/pkg/afrouter/request.go
new file mode 100644
index 0000000..8fc15aa
--- /dev/null
+++ b/internal/pkg/afrouter/request.go
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "context"
+ "encoding/hex"
+ "errors"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "io"
+ "sync"
+)
+
+type request struct {
+ mutex sync.Mutex
+ activeResponseStreamOnce sync.Once
+ setResponseHeaderOnce sync.Once
+ responseStreamMutex sync.Mutex
+
+ streams map[string]grpc.ClientStream
+
+ requestFrameBacklog [][]byte
+ responseErrChan chan error
+ sendClosed bool
+
+ backend *backend
+ ctx context.Context
+ serverStream grpc.ServerStream
+ methodInfo methodDetails
+ requestFrame *requestFrame
+ responseFrame *responseFrame
+ isStreamingRequest bool
+ isStreamingResponse bool
+}
+
+var transactionNotAcquiredErrorString = status.Error(codes.Canceled, "transaction-not-acquired").Error()
+
+// catchupRequestStreamThenForwardResponseStream must be called with request.mutex pre-locked
+func (r *request) catchupRequestStreamThenForwardResponseStream(connName string, stream grpc.ClientStream) {
+ r.streams[connName] = stream
+
+ // prime new streams with any traffic they might have missed (non-streaming requests only)
+ frame := *r.requestFrame // local copy of frame
+ for _, payload := range r.requestFrameBacklog {
+ frame.payload = payload
+ if err := stream.SendMsg(&frame); err != nil {
+ log.Debugf("Error on SendMsg: %s", err.Error())
+ break
+ }
+ }
+ if r.sendClosed {
+ stream.CloseSend()
+ }
+
+ r.mutex.Unlock()
+
+ r.forwardResponseStream(connName, stream)
+}
+
+// forwardResponseStream forwards the response stream
+func (r *request) forwardResponseStream(connName string, stream grpc.ClientStream) {
+ var queuedFrames [][]byte
+ frame := *r.responseFrame
+ var err error
+ activeStream := false
+ for {
+ err = stream.RecvMsg(&frame)
+ // if this is an inactive responder, ignore everything it sends
+ if err != nil && err.Error() == transactionNotAcquiredErrorString {
+ break
+ }
+ // the first thread to reach this point (first to receive a response frame) will become the active stream
+ r.activeResponseStreamOnce.Do(func() { activeStream = true })
+ if err != nil {
+ // this can be io.EOF which is the success case
+ break
+ }
+
+ if r.isStreamingResponse {
+ // streaming response - send immediately
+ if err = r.sendResponseFrame(stream, frame); err != nil {
+ break
+ }
+ } else { // !r.isStreamingResponse
+
+ if r.isStreamingRequest { // && !r.isStreamingResponse
+ // queue the frame (only send response when the last stream closes)
+ queuedFrames = append(queuedFrames, frame.payload)
+ } else { // !r.isStreamingRequest && !r.isStreamingResponse
+
+ // only the active stream will respond
+ if activeStream { // && !r.isStreamingRequest && !r.isStreamingResponse
+ // send the response immediately
+ if err = r.sendResponseFrame(stream, frame); err != nil {
+ break
+ }
+ } else { // !activeStream && !r.isStreamingRequest && !r.isStreamingResponse
+ // just read & discard until the stream dies
+ }
+ }
+ }
+ }
+
+ log.Debugf("Closing stream to %s", connName)
+
+ // io.EOF is the success case
+ if err == io.EOF {
+ err = nil
+ }
+
+ // this double-lock sets off alarm bells in my head
+ r.backend.mutex.Lock()
+ r.mutex.Lock()
+ delete(r.streams, connName)
+ streamsLeft := len(r.streams)
+
+ // handle the case where no cores are the active responder. Should never happen, but just in case...
+ if streamsLeft == 0 {
+ r.activeResponseStreamOnce.Do(func() { activeStream = true })
+ }
+
+ // if this the active stream (for non-streaming requests), or this is the last stream (for streaming requests)
+ if (activeStream && !r.isStreamingRequest && !r.isStreamingResponse) || (streamsLeft == 0 && (r.isStreamingRequest || r.isStreamingResponse)) {
+ // request is complete, cleanup
+ delete(r.backend.activeRequests, r)
+ r.mutex.Unlock()
+ r.backend.mutex.Unlock()
+
+ // send any queued frames we have (streaming request & !streaming response only, but no harm trying in other cases)
+ for _, payload := range queuedFrames {
+ if err != nil {
+ // if there's been an error, don't try to send anymore
+ break
+ }
+ frame.payload = payload
+ err = r.sendResponseFrame(stream, frame)
+ }
+
+ // We may have received Trailers as part of the call.
+ r.serverStream.SetTrailer(stream.Trailer())
+
+ // response stream complete
+ r.responseErrChan <- err
+ } else {
+ r.mutex.Unlock()
+ r.backend.mutex.Unlock()
+ }
+}
+
+func (r *request) sendResponseFrame(stream grpc.ClientStream, f responseFrame) error {
+ r.responseStreamMutex.Lock()
+ defer r.responseStreamMutex.Unlock()
+
+ // the header should only be set once, even if multiple streams can respond.
+ setHeader := false
+ r.setResponseHeaderOnce.Do(func() { setHeader = true })
+ if setHeader {
+ // This is a bit of a hack, but client to server headers are only readable after first client msg is
+ // received but must be written to server stream before the first msg is flushed.
+ // This is the only place to do it nicely.
+ md, err := stream.Header()
+ if err != nil {
+ return err
+ }
+ // Update the metadata for the response.
+ if f.metaKey != NoMeta {
+ if f.metaVal == "" {
+ // We could also always just do this
+ md.Set(f.metaKey, f.backend.name)
+ } else {
+ md.Set(f.metaKey, f.metaVal)
+ }
+ }
+ if err := r.serverStream.SendHeader(md); err != nil {
+ return err
+ }
+ }
+
+ log.Debugf("Response frame %s", hex.EncodeToString(f.payload))
+
+ return r.serverStream.SendMsg(&f)
+}
+
+func (r *request) sendAll(frame *requestFrame) error {
+ r.mutex.Lock()
+ if !r.isStreamingRequest {
+ // save frames of non-streaming requests, so we can catchup new streams
+ r.requestFrameBacklog = append(r.requestFrameBacklog, frame.payload)
+ }
+
+ // send to all existing streams
+ streams := make(map[string]grpc.ClientStream, len(r.streams))
+ for n, s := range r.streams {
+ streams[n] = s
+ }
+ r.mutex.Unlock()
+
+ var rtrn error
+ atLeastOne := false
+ atLeastOneSuccess := false
+ for _, stream := range streams {
+ if err := stream.SendMsg(frame); err != nil {
+ log.Debugf("Error on SendMsg: %s", err.Error())
+ rtrn = err
+ } else {
+ atLeastOneSuccess = true
+ }
+ atLeastOne = true
+ }
+ // If one of the streams succeeded, declare success
+ // if none did pick an error and return it.
+ if atLeastOne {
+ if atLeastOneSuccess {
+ return nil
+ } else {
+ return rtrn
+ }
+ } else {
+ err := errors.New("unable to send, all streams have closed")
+ log.Error(err)
+ return err
+ }
+}
+
+func (r *request) forwardRequestStream(src grpc.ServerStream) error {
+ // The frame buffer already has the results of a first
+ // RecvMsg in it so the first thing to do is to
+ // send it to the list of client streams and only
+ // then read some more.
+ frame := *r.requestFrame // local copy of frame
+ var rtrn error
+ for {
+ // Send the message to each of the backend streams
+ if err := r.sendAll(&frame); err != nil {
+ log.Debugf("SendAll failed %s", err.Error())
+ rtrn = err
+ break
+ }
+ log.Debugf("Request frame %s", hex.EncodeToString(frame.payload))
+ if err := src.RecvMsg(&frame); err != nil {
+ rtrn = err // this can be io.EOF which is happy case
+ break
+ }
+ }
+
+ r.mutex.Lock()
+ log.Debug("Closing southbound streams")
+ r.sendClosed = true
+ for _, stream := range r.streams {
+ stream.CloseSend()
+ }
+ r.mutex.Unlock()
+
+ if rtrn != io.EOF {
+ log.Debugf("s2cErr reporting %v", rtrn)
+ return rtrn
+ }
+ log.Debug("s2cErr reporting EOF")
+ // this is the successful case where the sender has encountered io.EOF, and won't be sending anymore.
+ // the clientStream>serverStream may continue sending though.
+ return nil
+}
diff --git a/internal/pkg/afrouter/round-robin-router.go b/internal/pkg/afrouter/round-robin-router.go
new file mode 100644
index 0000000..b5f031e
--- /dev/null
+++ b/internal/pkg/afrouter/round-robin-router.go
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+)
+
+type RoundRobinRouter struct {
+ name string
+ grpcService string
+ cluster *cluster
+ currentBackend **backend
+}
+
+func newRoundRobinRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+ var err error = nil
+ var rtrn_err = false
+ // Validate the configuration
+
+ log.Debug("Creating a new round robin router")
+ // A name must exist
+ if config.Name == "" {
+ log.Error("A router 'name' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoPackage == "" {
+ log.Error("A 'package' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoService == "" {
+ log.Error("A 'service' must be specified")
+ rtrn_err = true
+ }
+
+ var bptr *backend
+ bptr = nil
+ rr := RoundRobinRouter{
+ name: config.Name,
+ grpcService: rconf.ProtoService,
+ currentBackend: &bptr,
+ }
+
+ // Create the backend cluster or link to an existing one
+ ok := true
+ if rr.cluster, ok = clusters[config.backendCluster.Name]; !ok {
+ if rr.cluster, err = newBackendCluster(config.backendCluster); err != nil {
+ log.Errorf("Could not create a backend for router %s", config.Name)
+ rtrn_err = true
+ }
+ }
+
+ if rtrn_err {
+ return rr, errors.New(fmt.Sprintf("Failed to create a new router '%s'", rr.name))
+ }
+
+ return rr, nil
+}
+
+func (rr RoundRobinRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
+ return "", "", nil
+}
+
+func (rr RoundRobinRouter) IsStreaming(_ string) (bool, bool) {
+ panic("not implemented")
+}
+
+func (rr RoundRobinRouter) BackendCluster(s string, mk string) (*cluster, error) {
+ return rr.cluster, nil
+}
+
+func (rr RoundRobinRouter) Name() string {
+ return rr.name
+}
+
+func (rr RoundRobinRouter) Route(sel interface{}) (*backend, *connection) {
+ var err error
+ switch sl := sel.(type) {
+ case *requestFrame:
+ // Since this is a round robin router just get the next backend
+ if *rr.currentBackend, err = rr.cluster.nextBackend(*rr.currentBackend, BackendSequenceRoundRobin); err == nil {
+ return *rr.currentBackend, nil
+ } else {
+ sl.err = err
+ return nil, nil
+ }
+ default:
+ log.Errorf("Internal: invalid data type in Route call %v", sel)
+ return nil, nil
+ }
+}
+
+func (rr RoundRobinRouter) Service() string {
+ return rr.grpcService
+}
+
+func (rr RoundRobinRouter) FindBackendCluster(becName string) *cluster {
+ if becName == rr.cluster.name {
+ return rr.cluster
+ }
+ return nil
+}
+
+func (rr RoundRobinRouter) ReplyHandler(sel interface{}) error { // This is a no-op
+ return nil
+}
diff --git a/internal/pkg/afrouter/round-robin-router_test.go b/internal/pkg/afrouter/round-robin-router_test.go
new file mode 100644
index 0000000..f9bed07
--- /dev/null
+++ b/internal/pkg/afrouter/round-robin-router_test.go
@@ -0,0 +1,289 @@
+/*
+ * Portions copyright 2019-present Open Networking Foundation
+ * Original copyright 2019-present Ciena Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the"github.com/stretchr/testify/assert" "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package afrouter
+
+import (
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ common_pb "github.com/opencord/voltha-protos/go/common"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc"
+ "testing"
+)
+
+const (
+ ROUND_ROBIN_ROUTER_PROTOFILE = "../../../vendor/github.com/opencord/voltha-protos/go/voltha.pb"
+)
+
+func init() {
+ log.SetDefaultLogger(log.JSON, log.DebugLevel, nil)
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+func MakeRoundRobinTestConfig(numBackends int, numConnections int) (*RouteConfig, *RouterConfig) {
+
+ var backends []BackendConfig
+ for backendIndex := 0; backendIndex < numBackends; backendIndex++ {
+ var connections []ConnectionConfig
+ for connectionIndex := 0; connectionIndex < numConnections; connectionIndex++ {
+ connectionConfig := ConnectionConfig{
+ Name: fmt.Sprintf("ro_vcore%d%d", backendIndex, connectionIndex+1),
+ Addr: "foo",
+ Port: "123",
+ }
+ connections = append(connections, connectionConfig)
+ }
+
+ backendConfig := BackendConfig{
+ Name: fmt.Sprintf("ro_vcore%d", backendIndex),
+ Type: BackendSingleServer,
+ Connections: connections,
+ }
+
+ backends = append(backends, backendConfig)
+ }
+
+ backendClusterConfig := BackendClusterConfig{
+ Name: "ro_vcore",
+ Backends: backends,
+ }
+
+ routeConfig := RouteConfig{
+ Name: "read_only",
+ Type: RouteTypeRoundRobin,
+ Association: AssociationRoundRobin,
+ BackendCluster: "ro_vcore",
+ backendCluster: &backendClusterConfig,
+ Methods: []string{"ListDevicePorts"},
+ }
+
+ routerConfig := RouterConfig{
+ Name: "vcore",
+ ProtoService: "VolthaService",
+ ProtoPackage: "voltha",
+ Routes: []RouteConfig{routeConfig},
+ ProtoFile: ROUND_ROBIN_ROUTER_PROTOFILE,
+ }
+ return &routeConfig, &routerConfig
+}
+
+// Route() requires an open connection, so pretend we have one.
+func PretendRoundRobinOpenConnection(router Router, clusterName string, backendIndex int, connectionName string) {
+ cluster := router.FindBackendCluster(clusterName)
+
+ // Route Method expects an open connection
+ conn := cluster.backends[backendIndex].connections[connectionName]
+ cluster.backends[backendIndex].openConns[conn] = &grpc.ClientConn{}
+}
+
+// Common setup to run before each unit test
+func RoundRobinTestSetup() {
+ // reset globals that need to be clean for each unit test
+
+ clusters = make(map[string]*cluster)
+}
+
+// Test creation of a new RoundRobinRouter, and the Service(), Name(), FindBackendCluster(), and
+// ReplyHandler() methods.
+func TestRoundRobinRouterInit(t *testing.T) {
+ RoundRobinTestSetup()
+
+ routeConfig, routerConfig := MakeRoundRobinTestConfig(1, 1)
+
+ router, err := newRoundRobinRouter(routerConfig, routeConfig)
+
+ assert.NotNil(t, router)
+ assert.Nil(t, err)
+
+ assert.Equal(t, router.Service(), "VolthaService")
+ assert.Equal(t, router.Name(), "read_only")
+
+ cluster, err := router.BackendCluster("foo", "bar")
+ assert.Equal(t, cluster, clusters["ro_vcore"])
+ assert.Nil(t, err)
+
+ assert.Equal(t, router.FindBackendCluster("ro_vcore"), clusters["ro_vcore"])
+ assert.Nil(t, router.ReplyHandler("foo"))
+}
+
+func TestRoundRobinRouterInitNoName(t *testing.T) {
+ RoundRobinTestSetup()
+
+ routeConfig, routerConfig := MakeRoundRobinTestConfig(1, 1)
+ routeConfig.Name = ""
+
+ _, err := newRoundRobinRouter(routerConfig, routeConfig)
+
+ assert.EqualError(t, err, "Failed to create a new router ''")
+}
+
+func TestRoundRobinRouterInitNoProtoPackage(t *testing.T) {
+ RoundRobinTestSetup()
+
+ routeConfig, routerConfig := MakeRoundRobinTestConfig(1, 1)
+ routerConfig.ProtoPackage = ""
+
+ _, err := newRoundRobinRouter(routerConfig, routeConfig)
+
+ assert.EqualError(t, err, "Failed to create a new router 'read_only'")
+}
+
+func TestRoundRobinRouterInitNoProtoService(t *testing.T) {
+ RoundRobinTestSetup()
+
+ routeConfig, routerConfig := MakeRoundRobinTestConfig(1, 1)
+ routerConfig.ProtoService = ""
+
+ _, err := newRoundRobinRouter(routerConfig, routeConfig)
+
+ assert.EqualError(t, err, "Failed to create a new router 'read_only'")
+}
+
+// Tests a cluster with no backends
+func TestRoundRobinRouteZero(t *testing.T) {
+ RoundRobinTestSetup()
+
+ routeConfig, routerConfig := MakeRoundRobinTestConfig(1, 1)
+
+ router, err := newRoundRobinRouter(routerConfig, routeConfig)
+ assert.Nil(t, err)
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ methodInfo: newMethodDetails("/voltha.VolthaService/ListDevicePorts")}
+
+ backend, connection := router.Route(sel)
+
+ assert.EqualError(t, sel.err, "No backend with open connections found")
+ assert.Nil(t, backend)
+ assert.Nil(t, connection)
+}
+
+// Tests a cluster with only one Backend
+func TestRoundRobinRouteOne(t *testing.T) {
+ RoundRobinTestSetup()
+
+ routeConfig, routerConfig := MakeRoundRobinTestConfig(1, 1)
+
+ router, err := newRoundRobinRouter(routerConfig, routeConfig)
+ assert.Nil(t, err)
+
+ PretendRoundRobinOpenConnection(router, "ro_vcore", 0, "ro_vcore01")
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ methodInfo: newMethodDetails("/voltha.VolthaService/ListDevicePorts")}
+
+ backend, connection := router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "ro_vcore0", backend.name)
+ assert.Nil(t, connection)
+
+ // Since we only have one backend, calling Route a second time should return the same one
+
+ backend, connection = router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "ro_vcore0", backend.name)
+ assert.Nil(t, connection)
+}
+
+// Tests a cluster with two Backends
+func TestRoundRobinRouteTwo(t *testing.T) {
+ RoundRobinTestSetup()
+
+ routeConfig, routerConfig := MakeRoundRobinTestConfig(2, 1)
+
+ router, err := newRoundRobinRouter(routerConfig, routeConfig)
+ assert.Nil(t, err)
+
+ PretendRoundRobinOpenConnection(router, "ro_vcore", 0, "ro_vcore01")
+ PretendRoundRobinOpenConnection(router, "ro_vcore", 1, "ro_vcore11")
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ methodInfo: newMethodDetails("/voltha.VolthaService/ListDevicePorts")}
+
+ backend, connection := router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "ro_vcore0", backend.name)
+ assert.Nil(t, connection)
+
+ // Since we have two backends, calling Route a second time should return the second
+
+ backend, connection = router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "ro_vcore1", backend.name)
+ assert.Nil(t, connection)
+
+ // Calling Route a third time should return the first again
+
+ backend, connection = router.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, "ro_vcore0", backend.name)
+ assert.Nil(t, connection)
+}
+
+// Tests a cluster with one backend but no open connections
+func TestRoundRobinRouteOneNoOpenConnection(t *testing.T) {
+ RoundRobinTestSetup()
+
+ routeConfig, routerConfig := MakeRoundRobinTestConfig(1, 1)
+
+ router, err := newRoundRobinRouter(routerConfig, routeConfig)
+ assert.Nil(t, err)
+
+ idMessage := &common_pb.ID{Id: "1234"}
+
+ idData, err := proto.Marshal(idMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: idData,
+ err: nil,
+ methodInfo: newMethodDetails("/voltha.VolthaService/ListDevicePorts")}
+
+ backend, connection := router.Route(sel)
+
+ assert.EqualError(t, sel.err, "No backend with open connections found")
+ assert.Nil(t, backend)
+ assert.Nil(t, connection)
+}
diff --git a/internal/pkg/afrouter/router.go b/internal/pkg/afrouter/router.go
new file mode 100644
index 0000000..7abbceb
--- /dev/null
+++ b/internal/pkg/afrouter/router.go
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "errors"
+ "fmt"
+ "google.golang.org/grpc"
+)
+
+var allRouters = make(map[string]Router)
+
+// The router interface
+type Router interface {
+ Name() string
+
+ // Route() returns a backend and a connection. The connection is optional and if unspecified, then any
+ // connection on the backend may be used.
+ Route(interface{}) (*backend, *connection)
+
+ Service() string
+ IsStreaming(string) (bool, bool)
+ BackendCluster(string, string) (*cluster, error)
+ FindBackendCluster(string) *cluster
+ ReplyHandler(interface{}) error
+ GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error)
+}
+
+func newRouter(config *RouterConfig) (Router, error) {
+ r, err := newMethodRouter(config)
+ if err == nil {
+ allRouters[r.Name()] = r
+ }
+ return r, err
+}
+
+func newSubRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+ switch config.Type {
+ case RouteTypeRpcAffinityMessage:
+ r, err := newAffinityRouter(rconf, config)
+ if err == nil {
+ allRouters[rconf.Name+config.Name] = r
+ }
+ return r, err
+ case RouteTypeBinding:
+ r, err := newBindingRouter(rconf, config)
+ if err == nil {
+ allRouters[rconf.Name+config.Name] = r
+ }
+ return r, err
+ case RouteTypeRoundRobin:
+ r, err := newRoundRobinRouter(rconf, config)
+ if err == nil {
+ allRouters[rconf.Name+config.Name] = r
+ }
+ return r, err
+ case RouteTypeSource:
+ r, err := newSourceRouter(rconf, config)
+ if err == nil {
+ allRouters[rconf.Name+config.Name] = r
+ }
+ return r, err
+ default:
+ return nil, errors.New(fmt.Sprintf("Internal error, undefined router type: %s", config.Type))
+ }
+}
diff --git a/internal/pkg/afrouter/server.go b/internal/pkg/afrouter/server.go
new file mode 100644
index 0000000..82269d2
--- /dev/null
+++ b/internal/pkg/afrouter/server.go
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+import (
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "net"
+ "net/url"
+ "strconv"
+)
+
+var (
+ clientStreamDescForProxying = &grpc.StreamDesc{
+ ServerStreams: true,
+ ClientStreams: true,
+ }
+)
+
+type server struct {
+ running bool
+ name string
+ proxyListener net.Listener
+ routers map[string]map[string]Router
+ proxyServer *grpc.Server
+}
+
+func newServer(config *ServerConfig) (*server, error) {
+ var err error = nil
+ var rtrn_err = false
+ var s *server
+ // Change over to the new configuration format
+ // Validate the configuration
+ // There should be a name
+ if config.Name == "" {
+ log.Error("A server has been defined with no name")
+ rtrn_err = true
+ }
+ // Validate that there's a port specified
+ if config.Port == 0 {
+ log.Errorf("Server %s does not have a valid port assigned", config.Name)
+ rtrn_err = true
+ }
+ // Validate the ip address if one is provided
+ if _, err := url.Parse(config.Addr); err != nil {
+ log.Errorf("Invalid address '%s' provided for server '%s'", config.Addr, config.Name)
+ rtrn_err = true
+ }
+ if config.Type != "grpc" && config.Type != "streaming_grpc" {
+ if config.Type == "" {
+ log.Errorf("A server 'type' must be defined for server %s", config.Name)
+ } else {
+ log.Errorf("The server type must be either 'grpc' or 'streaming_grpc' "+
+ "but '%s' was found for server '%s'", config.Type, config.Name)
+ }
+ rtrn_err = true
+ }
+ if len(config.Routers) == 0 {
+ log.Errorf("At least one router must be specified for server '%s'", config.Name)
+ rtrn_err = true
+ }
+
+ if rtrn_err {
+ return nil, errors.New("Server configuration failed")
+ } else {
+ // The configuration is valid, create a server and configure it.
+ s = &server{name: config.Name, routers: make(map[string]map[string]Router)}
+ // The listener
+ if s.proxyListener, err =
+ net.Listen("tcp", config.Addr+":"+
+ strconv.Itoa(int(config.Port))); err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ // Create the routers
+ log.Debugf("Configuring the routers for server %s", s.name)
+ for p, r := range config.routers {
+ log.Debugf("Processing router %s for package %s", r.Name, p)
+ if dr, err := newRouter(r); err != nil {
+ log.Error(err)
+ return nil, err
+ } else {
+ log.Debugf("Adding router %s to the server %s for package %s and service %s",
+ dr.Name(), s.name, p, dr.Service())
+ if _, ok := s.routers[p]; ok {
+ s.routers[p][dr.Service()] = dr
+ } else {
+ s.routers[p] = make(map[string]Router)
+ s.routers[p][dr.Service()] = dr
+ }
+ }
+ }
+ // Configure the grpc handler
+ s.proxyServer = grpc.NewServer(
+ grpc.CustomCodec(Codec()),
+ grpc.UnknownServiceHandler(s.TransparentHandler()),
+ )
+
+ }
+ return s, nil
+}
+
+func (s *server) Name() string {
+ return s.name
+}
+
+func (s *server) TransparentHandler() grpc.StreamHandler {
+ return s.handler
+}
+
+func (s *server) getRouter(pkg string, service string) (Router, bool) {
+ if fn, ok := s.routers[pkg][service]; ok { // Both specified
+ return fn, ok
+ } else if fn, ok = s.routers["*"][service]; ok { // Package wild card
+ return fn, ok
+ } else if fn, ok = s.routers[pkg]["*"]; ok { // Service wild card
+ return fn, ok
+ } else if fn, ok = s.routers["*"]["*"]; ok { // Both Wildcarded
+ return fn, ok
+ } else {
+ return nil, false
+ }
+}
+
+func (s *server) handler(srv interface{}, serverStream grpc.ServerStream) error {
+ // Determine what router is intended to handle this request
+ fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
+ if !ok {
+ return grpc.Errorf(codes.Internal, "lowLevelServerStream doesn't exist in context")
+ }
+ log.Debugf("\n\nProcessing grpc request %s on server %s", fullMethodName, s.name)
+ methodInfo := newMethodDetails(fullMethodName)
+ r, ok := s.getRouter(methodInfo.pkg, methodInfo.service)
+ //fn, ok := s.routers[methodInfo.pkg][methodInfo.service]
+ if !ok {
+ // TODO: Should this be punted to a default transparent router??
+ // Probably not, if one is defined yes otherwise just crap out.
+
+ err := fmt.Errorf("Unable to dispatch! Service '%s' for package '%s' not found.", methodInfo.service, methodInfo.pkg)
+ log.Error(err)
+ return err
+ }
+ log.Debugf("Selected router %s", r.Name())
+
+ mk, mv, err := r.GetMetaKeyVal(serverStream)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+
+ //nbR := &nbRequest(srv:srv,serverStream:serverStream,r:r,methodInfo:methodInfo,metaKey:mk,metaVal:mv)
+
+ // Extract the cluster from the selected router and use it to manage the transfer
+ if cluster, err := r.BackendCluster(methodInfo.method, mk); err != nil {
+ return err
+ } else {
+ //return beCluster.handler(nbR)
+ return cluster.handler(srv, serverStream, r, methodInfo, mk, mv)
+ }
+}
diff --git a/internal/pkg/afrouter/signals.go b/internal/pkg/afrouter/signals.go
new file mode 100644
index 0000000..37b154b
--- /dev/null
+++ b/internal/pkg/afrouter/signals.go
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file implements an exit handler that tries to shut down all the
+// running servers before finally exiting. There are 2 triggers to this
+// clean exit thread: signals and an exit channel.
+
+package afrouter
+
+import (
+ "github.com/opencord/voltha-go/common/log"
+ "os"
+ "os/signal"
+ "syscall"
+)
+
+var errChan = make(chan error)
+var doneChan = make(chan error)
+
+func InitExitHandler() error {
+
+ // Start the signal handler
+ go signalHandler()
+ // Start the error handler
+ go errHandler()
+
+ return nil
+}
+
+func signalHandler() {
+ // Make signal channel and register notifiers for Interupt and Terminate
+ sigchan := make(chan os.Signal, 1)
+ signal.Notify(sigchan, os.Interrupt)
+ signal.Notify(sigchan, syscall.SIGTERM)
+ signal.Notify(sigchan, syscall.SIGKILL)
+
+ // Block until we receive a signal on the channel
+ <-sigchan
+
+ log.Info("shutting down on signal as requested")
+
+ cleanExit(nil)
+}
+
+func errHandler() {
+
+ err := <-errChan
+
+ cleanExit(err)
+}
+
+func cleanExit(err error) {
+ // Log the shutdown
+ if arProxy != nil {
+ for _, srvr := range arProxy.servers {
+ if srvr.running {
+ log.With(log.Fields{"server": srvr.name}).Debug("Closing server")
+ srvr.proxyServer.GracefulStop()
+ srvr.proxyListener.Close()
+ }
+ }
+ }
+ for _, cl := range clusters {
+ for _, bknd := range cl.backends {
+ log.Debugf("Closing backend %s", bknd.name)
+ for _, conn := range bknd.connections {
+ log.Debugf("Closing connection %s", conn.name)
+ conn.close()
+ }
+ }
+ }
+ doneChan <- err
+ //os.Exit(0)
+}
diff --git a/internal/pkg/afrouter/source-router.go b/internal/pkg/afrouter/source-router.go
new file mode 100644
index 0000000..7841554
--- /dev/null
+++ b/internal/pkg/afrouter/source-router.go
@@ -0,0 +1,303 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package afrouter
+
+/* Source-Router
+
+ This router implements source routing where the caller identifies the
+ component the message should be routed to. The `RouteField` should be
+ configured with the gRPC field name to inspect to determine the
+ destination. This field is assumed to be a string. That string will
+ then be used to identify a particular connection on a particular
+ backend.
+
+ The source-router must be configured with a backend cluster, as all routers
+ must identify a backend cluster. However, that backend cluster
+ is merely a placeholder and is not used by the source-router. The
+ source-router's Route() function will return whatever backend cluster is
+ specified by the `RouteField`.
+*/
+
+import (
+ "errors"
+ "fmt"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ "google.golang.org/grpc"
+ "regexp"
+ "strconv"
+)
+
+type SourceRouter struct {
+ name string
+ //association associationType
+ routingField string
+ grpcService string
+ methodMap map[string]byte
+ cluster *cluster
+}
+
+func newSourceRouter(rconf *RouterConfig, config *RouteConfig) (Router, error) {
+ var err error = nil
+ var rtrn_err = false
+ var pkg_re = regexp.MustCompile(`^(\.[^.]+\.)(.+)$`)
+ // Validate the configuration
+
+ // A name must exist
+ if config.Name == "" {
+ log.Error("A router 'name' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoPackage == "" {
+ log.Error("A 'package' must be specified")
+ rtrn_err = true
+ }
+
+ if rconf.ProtoService == "" {
+ log.Error("A 'service' must be specified")
+ rtrn_err = true
+ }
+
+ if config.RouteField == "" {
+ log.Error("A 'routing_field' must be specified")
+ rtrn_err = true
+ }
+
+ // TODO The overrieds section is currently not being used
+ // so the router will route all methods based on the
+ // routing_field. This needs to be added so that methods
+ // can have different routing fields.
+ dr := SourceRouter{
+ name: config.Name,
+ grpcService: rconf.ProtoService,
+ methodMap: make(map[string]byte),
+ }
+
+ // Build the routing structure based on the loaded protobuf
+ // descriptor file and the config information.
+ type key struct {
+ method string
+ field string
+ }
+ var fieldNumberLookup = make(map[key]byte)
+ for _, f := range rconf.protoDescriptor.File {
+ // Build a temporary map of message types by name.
+ for _, m := range f.MessageType {
+ for _, fld := range m.Field {
+ log.Debugf("Processing message '%s', field '%s'", *m.Name, *fld.Name)
+ fieldNumberLookup[key{*m.Name, *fld.Name}] = byte(*fld.Number)
+ }
+ }
+ }
+ for _, f := range rconf.protoDescriptor.File {
+ if *f.Package == rconf.ProtoPackage {
+ for _, s := range f.Service {
+ if *s.Name == rconf.ProtoService {
+ log.Debugf("Loading package data '%s' for service '%s' for router '%s'", *f.Package, *s.Name, dr.name)
+ // Now create a map keyed by method name with the value being the
+ // field number of the route selector.
+ var ok bool
+ for _, m := range s.Method {
+ // Find the input type in the messages and extract the
+ // field number and save it for future reference.
+ log.Debugf("Processing method '%s'", *m.Name)
+ // Determine if this is a method we're supposed to be processing.
+ if needMethod(*m.Name, config) {
+ log.Debugf("Enabling method '%s'", *m.Name)
+ pkg_methd := pkg_re.FindStringSubmatch(*m.InputType)
+ if pkg_methd == nil {
+ log.Errorf("Regular expression didn't match input type '%s'", *m.InputType)
+ rtrn_err = true
+ }
+ // The input type has the package name prepended to it. Remove it.
+ //in := (*m.InputType)[len(rconf.ProtoPackage)+2:]
+ in := pkg_methd[PKG_MTHD_MTHD]
+ dr.methodMap[*m.Name], ok = fieldNumberLookup[key{in, config.RouteField}]
+ if !ok {
+ log.Errorf("Method '%s' has no field named '%s' in it's parameter message '%s'",
+ *m.Name, config.RouteField, in)
+ rtrn_err = true
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // We need to pick a cluster, because server will call cluster.handler. The choice we make doesn't
+ // matter, as we can return a different cluster from Route().
+ ok := true
+ if dr.cluster, ok = clusters[config.backendCluster.Name]; !ok {
+ if dr.cluster, err = newBackendCluster(config.backendCluster); err != nil {
+ log.Errorf("Could not create a backend for router %s", config.Name)
+ rtrn_err = true
+ }
+ }
+
+ if rtrn_err {
+ return dr, errors.New(fmt.Sprintf("Failed to create a new router '%s'", dr.name))
+ }
+
+ return dr, nil
+}
+
+func (ar SourceRouter) Service() string {
+ return ar.grpcService
+}
+
+func (ar SourceRouter) Name() string {
+ return ar.name
+}
+
+func (ar SourceRouter) skipField(data *[]byte, idx *int) error {
+ switch (*data)[*idx] & 3 {
+ case 0: // Varint
+ // skip the field number/type
+ *idx++
+ // if the msb is set, then more bytes to follow
+ for (*data)[*idx] >= 128 {
+ *idx++
+ }
+ // the last byte doesn't have the msb set
+ *idx++
+ case 1: // 64 bit
+ *idx += 9
+ case 2: // Length delimited
+ // skip the field number / type
+ *idx++
+ // read a varint that tells length of string
+ b := proto.NewBuffer((*data)[*idx:])
+ t, _ := b.DecodeVarint()
+ // skip the length varint and the string bytes
+ // TODO: This assumes the varint was one byte long -- max string length is 127 bytes
+ *idx += int(t) + 1
+ case 3: // Deprecated
+ case 4: // Deprecated
+ case 5: // 32 bit
+ *idx += 5
+ }
+ return nil
+}
+
+func (ar SourceRouter) decodeProtoField(payload []byte, fieldId byte) (string, error) {
+ idx := 0
+ b := proto.NewBuffer([]byte{})
+ //b.DebugPrint("The Buffer", payload)
+ for { // Find the route selector field
+ log.Debugf("Decoding source value attributeNumber: %d from %v at index %d", fieldId, payload, idx)
+ log.Debugf("Attempting match with payload: %d, methodTable: %d", payload[idx], fieldId)
+ if payload[idx]>>3 == fieldId {
+ log.Debugf("Method match with payload: %d, methodTable: %d", payload[idx], fieldId)
+ // TODO: Consider supporting other selector types.... Way, way in the future
+ // ok, the future is now, support strings as well... ugh.
+ var selector string
+ switch payload[idx] & 3 {
+ case 0: // Integer
+ b.SetBuf(payload[idx+1:])
+ v, e := b.DecodeVarint()
+ if e == nil {
+ log.Debugf("Decoded the ing field: %v", v)
+ selector = strconv.Itoa(int(v))
+ } else {
+ log.Errorf("Failed to decode varint %v", e)
+ return "", e
+ }
+ case 2: // Length delimited AKA string
+ b.SetBuf(payload[idx+1:])
+ v, e := b.DecodeStringBytes()
+ if e == nil {
+ log.Debugf("Decoded the string field: %v", v)
+ selector = v
+ } else {
+ log.Errorf("Failed to decode string %v", e)
+ return "", e
+ }
+ default:
+ err := errors.New(fmt.Sprintf("Only integer and string route selectors are permitted"))
+ log.Error(err)
+ return "", err
+ }
+ return selector, nil
+ } else if err := ar.skipField(&payload, &idx); err != nil {
+ log.Errorf("Parsing message failed %v", err)
+ return "", err
+ }
+ }
+}
+
+func (ar SourceRouter) Route(sel interface{}) (*backend, *connection) {
+ log.Debugf("SourceRouter sel %v", sel)
+ switch sl := sel.(type) {
+ case *requestFrame:
+ log.Debugf("Route called for nbFrame with method %s", sl.methodInfo.method)
+ // Not a south affinity binding method, proceed with north affinity binding.
+ if selector, err := ar.decodeProtoField(sl.payload, ar.methodMap[sl.methodInfo.method]); err == nil {
+ // selector is
+
+ for _, cluster := range clusters {
+ for _, backend := range cluster.backends {
+ log.Debugf("Checking backend %s", backend.name)
+ for _, connection := range backend.connections {
+ log.Debugf("Checking connection %s", connection.name)
+ // caller specified a backend and a connection
+ if backend.name+"."+connection.name == selector {
+ return backend, connection
+ }
+ }
+ // caller specified just a backend
+ if backend.name == selector {
+ return backend, nil
+ }
+ }
+ }
+ sl.err = fmt.Errorf("Backend %s not found", selector)
+ return nil, nil
+ }
+ default:
+ log.Errorf("Internal: invalid data type in Route call %v", sel)
+ return nil, nil
+ }
+ log.Errorf("Bad routing in SourceRouter:Route")
+ return nil, nil
+}
+
+func (ar SourceRouter) GetMetaKeyVal(serverStream grpc.ServerStream) (string, string, error) {
+ return "", "", nil
+}
+
+func (ar SourceRouter) IsStreaming(_ string) (bool, bool) {
+ panic("not implemented")
+}
+
+func (ar SourceRouter) BackendCluster(mthd string, metaKey string) (*cluster, error) {
+ // unsupported?
+ return ar.cluster, nil
+}
+
+func (ar SourceRouter) FindBackendCluster(beName string) *cluster {
+ // unsupported?
+ if beName == ar.cluster.name {
+ return ar.cluster
+ }
+ return nil
+}
+
+func (rr SourceRouter) ReplyHandler(sel interface{}) error { // This is a no-op
+ return nil
+}
diff --git a/internal/pkg/afrouter/source-router_test.go b/internal/pkg/afrouter/source-router_test.go
new file mode 100644
index 0000000..a89c034
--- /dev/null
+++ b/internal/pkg/afrouter/source-router_test.go
@@ -0,0 +1,140 @@
+/*
+ * Portions copyright 2019-present Open Networking Foundation
+ * Original copyright 2019-present Ciena Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the"github.com/stretchr/testify/assert" "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package afrouter
+
+import (
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ common_pb "github.com/opencord/voltha-protos/go/common"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+const (
+ SOURCE_ROUTER_PROTOFILE = "../../../vendor/github.com/opencord/voltha-protos/go/voltha.pb"
+)
+
+func init() {
+ log.SetDefaultLogger(log.JSON, log.DebugLevel, nil)
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+func MakeSourceRouterTestConfig() (*RouteConfig, *RouterConfig) {
+ connectionConfig := ConnectionConfig{
+ Name: "ro_vcore01",
+ Addr: "foo",
+ Port: "123",
+ }
+
+ backendConfig := BackendConfig{
+ Name: "ro_vcore0",
+ Type: BackendSingleServer,
+ Connections: []ConnectionConfig{connectionConfig},
+ }
+
+ backendClusterConfig := BackendClusterConfig{
+ Name: "ro_vcore",
+ Backends: []BackendConfig{backendConfig},
+ }
+
+ routeConfig := RouteConfig{
+ Name: "logger",
+ Type: RouteTypeSource,
+ RouteField: "component_name",
+ BackendCluster: "ro_vcore",
+ backendCluster: &backendClusterConfig,
+ Methods: []string{"UpdateLogLevel", "GetLogLevel"},
+ }
+
+ routerConfig := RouterConfig{
+ Name: "vcore",
+ ProtoService: "VolthaService",
+ ProtoPackage: "voltha",
+ Routes: []RouteConfig{routeConfig},
+ ProtoFile: SOURCE_ROUTER_PROTOFILE,
+ }
+ return &routeConfig, &routerConfig
+}
+
+func TestSourceRouterInit(t *testing.T) {
+ routeConfig, routerConfig := MakeSourceRouterTestConfig()
+
+ router, err := newSourceRouter(routerConfig, routeConfig)
+
+ assert.NotNil(t, router)
+ assert.Nil(t, err)
+
+ assert.Equal(t, router.Service(), "VolthaService")
+ assert.Equal(t, router.Name(), "logger")
+
+ cluster, err := router.BackendCluster("foo", "bar")
+ assert.Equal(t, cluster, clusters["ro_vcore"])
+ assert.Nil(t, err)
+
+ assert.Equal(t, router.FindBackendCluster("ro_vcore"), clusters["ro_vcore"])
+ assert.Nil(t, router.ReplyHandler("foo"))
+}
+
+func TestSourceRouterDecodeProtoField(t *testing.T) {
+ _, routerConfig := MakeSourceRouterTestConfig()
+ _, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ // Get the created AffinityRouter so we can inspect its state
+ sourceRouter := allRouters["vcorelogger"].(SourceRouter)
+
+ loggingMessage := &common_pb.Logging{Level: 1,
+ PackageName: "default",
+ ComponentName: "ro_vcore0.ro_vcore01"}
+
+ loggingData, err := proto.Marshal(loggingMessage)
+ assert.Nil(t, err)
+
+ s, err := sourceRouter.decodeProtoField(loggingData, 2) // field 2 is package_name
+ assert.Equal(t, s, "default")
+
+ s, err = sourceRouter.decodeProtoField(loggingData, 3) // field 2 is component_name
+ assert.Equal(t, s, "ro_vcore0.ro_vcore01")
+}
+
+func TestSourceRouterRoute(t *testing.T) {
+ _, routerConfig := MakeSourceRouterTestConfig()
+ _, err := newRouter(routerConfig)
+ assert.Nil(t, err)
+
+ // Get the created AffinityRouter so we can inspect its state
+ sourceRouter := allRouters["vcorelogger"].(SourceRouter)
+
+ loggingMessage := &common_pb.Logging{Level: 1,
+ PackageName: "default",
+ ComponentName: "ro_vcore0.ro_vcore01"}
+
+ loggingData, err := proto.Marshal(loggingMessage)
+ assert.Nil(t, err)
+
+ sel := &requestFrame{payload: loggingData,
+ err: nil,
+ methodInfo: newMethodDetails("/voltha.VolthaService/UpdateLogLevel")}
+
+ backend, connection := sourceRouter.Route(sel)
+
+ assert.Nil(t, sel.err)
+ assert.NotNil(t, backend)
+ assert.Equal(t, backend.name, "ro_vcore0")
+ assert.NotNil(t, connection)
+ assert.Equal(t, connection.name, "ro_vcore01")
+}