Changes to add the read only cores and some fixes to bugs
for processing the config file.
Change-Id: I1393c05d4cbce215e97d1f17b13e044eda7ae472
diff --git a/afrouter/afrouter/backend.go b/afrouter/afrouter/backend.go
index 9262d43..8ec286e 100644
--- a/afrouter/afrouter/backend.go
+++ b/afrouter/afrouter/backend.go
@@ -78,6 +78,7 @@
strategy int
location int
field string // Used only if location is protobuf
+ key string
}
type beConnection struct {
@@ -125,7 +126,7 @@
var err error = nil
var rtrn_err bool = false
var be *backend
- log.Debug("Creating a backend cluster with %v", conf)
+ log.Debugf("Creating a backend cluster with %v", conf)
// Validate the configuration
if conf.Name == "" {
log.Error("A backend cluster must have a name")
@@ -598,6 +599,14 @@
rtrn_err = true
}
be.activeAssoc.field = conf.Association.Field
+
+ if conf.Association.Key == "" && be.activeAssoc.location == AL_HEADER {
+ log.Errorf("An association key must be provided if the backend "+
+ "type is active/active and the location is set to header "+
+ "for backend %s in cluster %s", conf.Name, clusterName)
+ rtrn_err = true
+ }
+ be.activeAssoc.key = conf.Association.Key
if rtrn_err {
return nil, errors.New("Backend configuration failed")
}
@@ -605,6 +614,15 @@
// Connections can consist of just a name. This allows for dynamic configuration
// at a later time.
// TODO: validate that there is one connection for all but active/active backends
+ if len(conf.Connections) > 1 && be.activeAssoc.strategy != BE_ACTIVE_ACTIVE {
+ log.Errorf("Only one connection must be specified if the association "+
+ "strategy is not set to 'active_active'")
+ rtrn_err = true
+ }
+ if len(conf.Connections) == 0 {
+ log.Errorf("At least one connection must be specified")
+ rtrn_err = true
+ }
for _,cnConf := range conf.Connections {
if cnConf.Name == "" {
log.Errorf("A connection must have a name for backend %s in cluster %s",
diff --git a/afrouter/afrouter/config.go b/afrouter/afrouter/config.go
index 528a3f7..a9a01eb 100644
--- a/afrouter/afrouter/config.go
+++ b/afrouter/afrouter/config.go
@@ -91,7 +91,7 @@
type OverrideConfig struct {
Methods []string `json:"methods"`
- Method []string `json:"method"`
+ Method string `json:"method"`
RouteField string `json:"routing_field"`
}
@@ -113,6 +113,7 @@
Strategy string `json:"strategy"`
Location string `json:"location"`
Field string `json:"field"`
+ Key string `json:"key"`
}
type ConnectionConfig struct {
@@ -183,7 +184,10 @@
return err
}
- json.Unmarshal(configBytes, conf)
+ if err := json.Unmarshal(configBytes, conf); err != nil {
+ log.Errorf("Unmarshaling of the configuratino file failed: %v", err)
+ return err
+ }
// Now resolve references to different config objects in the
// config file. Currently there are 2 possible references
diff --git a/afrouter/afrouter/helpers.go b/afrouter/afrouter/helpers.go
index 6235337..4d7362b 100644
--- a/afrouter/afrouter/helpers.go
+++ b/afrouter/afrouter/helpers.go
@@ -17,9 +17,11 @@
package afrouter
+//import "github.com/opencord/voltha-go/common/log"
+
func strIndex(ar []string, match string) int {
- for idx := range ar {
- if ar[idx] == match {
+ for idx,v := range ar {
+ if v == match {
return idx
}
}
diff --git a/afrouter/arouter.json b/afrouter/arouter.json
index 50e1d31..7b075ce 100644
--- a/afrouter/arouter.json
+++ b/afrouter/arouter.json
@@ -62,7 +62,7 @@
"name":"read_only",
"type":"round_robin",
"association":"round_robin",
- "backend_cluster":"vcore",
+ "backend_cluster":"ro_vcore",
"methods":[ "ListDevicePorts",
"ListDevicePmConfigs",
"GetImages",
@@ -75,6 +75,8 @@
"ListLogicalDeviceFlowGroups",
"ListDevices",
"GetDevice",
+ "ListLogicalDevices",
+ "GetLogicalDevices",
"GetDeviceType",
"GetDeviceGroup",
"GetLogicalDevice",
@@ -95,8 +97,11 @@
},
"backend_cluster":"vcore",
"methods":["StreamPacketsOut",
+ "ReceivePacketsIn",
+ "ReceiveChangeEvents",
"Subscribe",
"ListLogicalDevices",
+ "GetLogicalDevice",
"ListDeviceFlowGroups",
"ListLogicalDeviceFlowGroups",
"ListDeviceFlows",
@@ -143,7 +148,9 @@
"type":"active_active",
"association": {
"strategy":"serial_number",
- "location":"header"
+ "location":"header",
+ "_TODO":"The key below needs to be implemented, currently hard coded",
+ "key":"voltha_serial_number"
},
"connections": [ {
"name":"vcore21",
@@ -161,7 +168,9 @@
"type":"active_active",
"association": {
"strategy":"serial_number",
- "location":"header"
+ "location":"header",
+ "_TODO":"The key below needs to be implemented, currently hard coded",
+ "key":"voltha_serial_number"
},
"connections": [ {
"name":"vcore31",
@@ -174,6 +183,36 @@
"port":""
}]
}]
+ },
+ {
+ "name":"ro_vcore",
+ "backends":[ {
+ "name":"ro_vcore1",
+ "type":"server",
+ "connections": [ {
+ "name":"ro_vcore11",
+ "addr":"",
+ "port":""
+ }]
+ },
+ {
+ "name":"ro_vcore2",
+ "type":"server",
+ "connections": [ {
+ "name":"ro_vcore21",
+ "addr":"",
+ "port":""
+ }]
+ },
+ {
+ "name":"ro_vcore3",
+ "type":"server",
+ "connections": [ {
+ "name":"ro_vcore31",
+ "addr":"",
+ "port":""
+ }]
+ }]
}
],
"api": {
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index bd2787a..0bcff08 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -50,17 +50,18 @@
Port uint64 `json:"Port"`
}
-type rwPod struct {
+type volthaPod struct {
name string
ipAddr string
node string
devIds map[string]struct{}
+ cluster string
backend string
connection string
}
type podTrack struct {
- pod *rwPod
+ pod *volthaPod
dn bool
}
@@ -124,8 +125,8 @@
return nil,errors.New("Timeout attempting to conect")
}
-func getRwPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*rwPod {
- var rtrn []*rwPod
+func getVolthaPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*volthaPod {
+ var rtrn []*volthaPod
pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
@@ -140,7 +141,7 @@
// Only add the pod if it has an IP address. If it doesn't then it likely crashed and
// and is still in the process of getting re-started.
if v.Status.PodIP != "" {
- rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
+ rtrn = append(rtrn, &volthaPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
devIds:make(map[string]struct{}), backend:"", connection:""})
}
}
@@ -148,7 +149,7 @@
return rtrn
}
-func reconcilePodDeviceIds(pod * rwPod, ids map[string]struct{}) bool {
+func reconcilePodDeviceIds(pod * volthaPod, ids map[string]struct{}) bool {
var idList cmn.IDs
for k,_ := range ids {
idList.Items = append(idList.Items, &cmn.ID{Id:k})
@@ -169,7 +170,7 @@
return true
}
-func queryPodDeviceIds(pod * rwPod) map[string]struct{} {
+func queryPodDeviceIds(pod * volthaPod) map[string]struct{} {
var rtrn map[string]struct{} = make(map[string]struct{})
// Open a connection to the pod
// port 50057
@@ -192,7 +193,7 @@
return rtrn
}
-func queryDeviceIds(pods []*rwPod) {
+func queryDeviceIds(pods []*volthaPod) {
for pk,_ := range pods {
// Keep the old Id list if a new list is not returned
if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
@@ -201,7 +202,7 @@
}
}
-func allEmpty(pods []*rwPod) bool {
+func allEmpty(pods []*volthaPod) bool {
for k,_ := range pods {
if len(pods[k].devIds) != 0 {
return false
@@ -210,13 +211,13 @@
return true
}
-func rmPod(pods []*rwPod, idx int) []*rwPod {
+func rmPod(pods []*volthaPod, idx int) []*volthaPod {
return append(pods[:idx],pods[idx+1:]...)
}
-func groupIntersectingPods1(pods []*rwPod, podCt int) ([][]*rwPod,[]*rwPod) {
- var rtrn [][]*rwPod
- var out []*rwPod
+func groupIntersectingPods1(pods []*volthaPod, podCt int) ([][]*volthaPod,[]*volthaPod) {
+ var rtrn [][]*volthaPod
+ var out []*volthaPod
for {
if len(pods) == 0 {
@@ -229,7 +230,7 @@
continue
}
// Start a pod group with this pod
- var grp []*rwPod
+ var grp []*volthaPod
grp = append(grp, pods[0])
pods = rmPod(pods,0)
//log.Debugf("Creating new group %s", pd[k].pod.name)
@@ -277,7 +278,7 @@
}
-func sameNode(pod *rwPod, grps [][]*rwPod) bool {
+func sameNode(pod *volthaPod, grps [][]*volthaPod) bool {
for _,v := range grps {
if v[0].node == pod.node {
return true
@@ -289,14 +290,14 @@
return false
}
-func startRemainingGroups1(grps [][]*rwPod, pods []*rwPod, podCt int) ([][]*rwPod, []*rwPod) {
- var grp []*rwPod
+func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
+ var grp []*volthaPod
for k,_ := range pods {
if sameNode(pods[k], grps) {
continue
}
- grp = []*rwPod{}
+ grp = []*volthaPod{}
grp = append(grp, pods[k])
pods = rmPod(pods, k)
grps = append(grps, grp)
@@ -307,7 +308,7 @@
return grps, pods
}
-func hasSingleSecondNode(grp []*rwPod) bool {
+func hasSingleSecondNode(grp []*volthaPod) bool {
var srvrs map[string]struct{} = make(map[string]struct{})
for k,_ := range grp {
if k == 0 {
@@ -321,7 +322,7 @@
return false
}
-func addNode(grps [][]*rwPod, idx *rwPod, item *rwPod) [][]*rwPod {
+func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
for k,_ := range grps {
if grps[k][0].name == idx.name {
grps[k] = append(grps[k], item)
@@ -332,7 +333,7 @@
return grps
}
-func removeNode(grps [][]*rwPod, item *rwPod) [][]*rwPod {
+func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
for k,_ := range grps {
for k1,_ := range grps[k] {
if grps[k][k1].name == item.name {
@@ -344,8 +345,8 @@
return grps
}
-func groupRemainingPods1(grps [][]*rwPod, pods []*rwPod) [][]*rwPod {
- var lgrps [][]*rwPod
+func groupRemainingPods1(grps [][]*volthaPod, pods []*volthaPod) [][]*volthaPod {
+ var lgrps [][]*volthaPod
// All groups must be started when this function is called.
// Copy incomplete groups
for k,_ := range grps {
@@ -402,8 +403,8 @@
return grps
}
-func groupPods1(pods []*rwPod) [][]*rwPod {
- var rtrn [][]*rwPod
+func groupPods1(pods []*volthaPod) [][]*volthaPod {
+ var rtrn [][]*volthaPod
var podCt int = len(pods)
rtrn,pods = groupIntersectingPods1(pods, podCt)
@@ -435,9 +436,9 @@
return false
}
-func setConnection(client pb.ConfigurationClient, backend string, connection string, addr string, port uint64) {
+func setConnection(client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
log.Debugf("Configuring backend %s : connection %s\n\n", backend, connection)
- cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:backend,
+ cnf := &pb.Conn{Server:"grpc_command",Cluster:cluster, Backend:backend,
Connection:connection,Addr:addr,
Port:port}
if res, err := client.SetConnection(context.Background(), cnf); err != nil {
@@ -461,7 +462,7 @@
}
}
-func getBackendForCore(coreId string, coreGroups [][]*rwPod) string {
+func getBackendForCore(coreId string, coreGroups [][]*volthaPod) string {
for _,v := range coreGroups {
for _,v2 := range v {
if v2.name == coreId {
@@ -475,7 +476,7 @@
func monitorDiscovery(client pb.ConfigurationClient,
ch <-chan *ic.InterContainerMessage,
- coreGroups [][]*rwPod) {
+ coreGroups [][]*volthaPod) {
var id map[string]struct{} = make(map[string]struct{})
select {
@@ -498,7 +499,7 @@
}
func startDiscoveryMonitor(client pb.ConfigurationClient,
- coreGroups [][]*rwPod) error {
+ coreGroups [][]*volthaPod) error {
var ch <-chan *ic.InterContainerMessage
// Connect to kafka for discovery events
topic := &kafka.Topic{Name: "AffinityRouter"}
@@ -517,16 +518,16 @@
// have changed based on the list provided
// and returns a coreGroup with only the changed
// items and a pod list with the new items
-func getAddrDiffs(coreGroups [][]*rwPod, rwPods []*rwPod) ([][]*rwPod, []*rwPod) {
- var nList []*rwPod
- var rtrn [][]*rwPod = make([][]*rwPod, nPods>>1)
+func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
+ var nList []*volthaPod
+ var rtrn [][]*volthaPod = make([][]*volthaPod, nPods>>1)
var ipAddrs map[string]struct{} = make(map[string]struct{})
log.Debug("Get addr diffs")
// Start with an empty array
for k,_ := range rtrn {
- rtrn[k] = make([]*rwPod, 2)
+ rtrn[k] = make([]*volthaPod, 2)
}
// Build a list with only the new items
@@ -553,8 +554,8 @@
// pods being replaced. The criteria is that
// the new pod be on the same server as the
// old pod was.
-func reconcileAddrDiffs(coreGroupDiffs [][]*rwPod, rwPodDiffs []*rwPod) ([][]*rwPod) {
- var srvrs map[string][]*rwPod = make(map[string][]*rwPod)
+func reconcileAddrDiffs(coreGroupDiffs [][]*volthaPod, rwPodDiffs []*volthaPod) ([][]*volthaPod) {
+ var srvrs map[string][]*volthaPod = make(map[string][]*volthaPod)
log.Debug("Reconciling diffs")
log.Debug("Building server list")
@@ -587,41 +588,82 @@
return coreGroupDiffs
}
-func applyAddrDiffs(client pb.ConfigurationClient, coreGroups [][]*rwPod, rwPods []*rwPod) {
- var newEntries [][]*rwPod
+func applyAddrDiffs(client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
+ var newEntries [][]*volthaPod
log.Debug("Applying diffs")
- newEntries = reconcileAddrDiffs(getAddrDiffs(coreGroups, rwPods))
+ switch cores := coreList.(type) {
+ case [][]*volthaPod:
+ newEntries = reconcileAddrDiffs(getAddrDiffs(cores, nPods))
- // Now replace the information in coreGropus with the new
- // entries and then reconcile the device ids on the core
- // that's in the new entry with the device ids of it's
- // active-active peer.
- for k1,v1 := range coreGroups {
- for k2,v2 := range v1 {
- if newEntries[k1][k2] != nil {
- // TODO: Missing is the case where bothe the primary
- // and the secondary core crash and come back.
- // Pull the device ids from the active-active peer
- ids := queryPodDeviceIds(coreGroups[k1][k2^1])
- if len(ids) != 0 {
- if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
- log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
+ // Now replace the information in coreGropus with the new
+ // entries and then reconcile the device ids on the core
+ // that's in the new entry with the device ids of it's
+ // active-active peer.
+ for k1,v1 := range cores {
+ for k2,v2 := range v1 {
+ if newEntries[k1][k2] != nil {
+ // TODO: Missing is the case where bothe the primary
+ // and the secondary core crash and come back.
+ // Pull the device ids from the active-active peer
+ ids := queryPodDeviceIds(cores[k1][k2^1])
+ if len(ids) != 0 {
+ if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
+ log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
+ }
}
+ // Send the affininty router new connection information
+ setConnection(client, "vcore", v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
+ // Copy the new entry information over
+ cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
+ cores[k1][k2].name = newEntries[k1][k2].name
+ cores[k1][k2].devIds = ids
}
- // Send the affininty router new connection information
- setConnection(client, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
- // Copy the new entry information over
- coreGroups[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
- coreGroups[k1][k2].name = newEntries[k1][k2].name
- coreGroups[k1][k2].devIds = ids
}
}
+ case []*volthaPod:
+ var mia []*volthaPod
+ var found bool
+ // TODO: Break this using functions to simplify
+ // reading of the code.
+ // Find the core(s) that have changed addresses
+ for k1,v1 := range cores {
+ found = false
+ for _, v2 := range nPods {
+ if v1.ipAddr == v2.ipAddr {
+ found = true
+ break
+ }
+ }
+ if found == false {
+ mia = append(mia, cores[k1])
+ }
+ }
+ // Now plug in the new addresses and set the connection
+ for _,v1 := range nPods {
+ found = false
+ for _,v2 := range cores {
+ if v1.ipAddr == v2.ipAddr {
+ found = true
+ break
+ }
+ }
+ if found == true {
+ continue
+ }
+ mia[0].ipAddr = v1.ipAddr
+ mia[0].name = v1.name
+ setConnection(client, "ro_vcore", mia[0].backend, mia[0].connection, v1.ipAddr, 50057)
+ // Now get rid of the mia entry just processed
+ mia = append(mia[:0],mia[1:]...)
+ }
+ default:
+ log.Error("Internal: Unexpected type in call to applyAddrDiffs");
}
}
-func updateDeviceIds(coreGroups [][]*rwPod, rwPods []*rwPod) {
- var byName map[string]*rwPod = make(map[string]*rwPod)
+func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
+ var byName map[string]*volthaPod = make(map[string]*volthaPod)
// Convinience
for _,v := range rwPods {
@@ -637,8 +679,10 @@
func startCoreMonitor(client pb.ConfigurationClient,
clientset *kubernetes.Clientset,
- coreFltr *regexp.Regexp,
- coreGroups [][]*rwPod) error {
+ rwCoreFltr *regexp.Regexp,
+ roCoreFltr *regexp.Regexp,
+ coreGroups [][]*volthaPod,
+ oRoPods []*volthaPod) error {
// Now that initial allocation has been completed, monitor the pods
// for IP changes
// The main loop needs to do the following:
@@ -657,7 +701,7 @@
for {
time.Sleep(10 * time.Second) // Wait a while
// Get the rw core list from k8s
- rwPods := getRwPods(clientset, coreFltr)
+ rwPods := getVolthaPods(clientset, rwCoreFltr)
queryDeviceIds(rwPods)
updateDeviceIds(coreGroups, rwPods)
// If we didn't get 2n+1 pods then wait since
@@ -673,19 +717,43 @@
if hasIpAddr(coreGroups, v.ipAddr) == false {
log.Debug("Address has changed...")
applyAddrDiffs(client, coreGroups, rwPods)
-
+ break
}
}
+
+ roPods := getVolthaPods(clientset, roCoreFltr)
+
+ if len(roPods) != 3 {
+ continue
+ }
+ for _,v := range roPods {
+ if hasIpAddr(oRoPods, v.ipAddr) == false {
+ applyAddrDiffs(client, oRoPods, roPods)
+ break
+ }
+ }
+
}
}
-func hasIpAddr(coreGroups [][]*rwPod, ipAddr string) bool {
- for _,v1 := range coreGroups {
- for _,v2 := range v1 {
- if v2.ipAddr == ipAddr {
+func hasIpAddr(coreList interface{}, ipAddr string) bool {
+ switch cores := coreList.(type) {
+ case []*volthaPod:
+ for _,v := range cores {
+ if v.ipAddr == ipAddr {
return true
}
}
+ case [][]*volthaPod:
+ for _,v1 := range cores {
+ for _,v2 := range v1 {
+ if v2.ipAddr == ipAddr {
+ return true
+ }
+ }
+ }
+ default:
+ log.Error("Internal: Unexpected type in call to hasIpAddr")
}
return false
}
@@ -700,7 +768,8 @@
// Set up the regular expression to identify the voltha cores
- coreFltr := regexp.MustCompile(`rw-core[0-9]-`)
+ rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
+ roCoreFltr := regexp.MustCompile(`ro-core-`)
// Set up logging
if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
@@ -719,7 +788,7 @@
client := pb.NewConfigurationClient(conn)
// Get the voltha rw-core podes
- rwPods := getRwPods(clientset, coreFltr)
+ rwPods := getVolthaPods(clientset, rwCoreFltr)
// Fetch the devices held by each running core
queryDeviceIds(rwPods)
@@ -734,6 +803,7 @@
// Assign the groupings to the the backends and connections
for k,_ := range coreGroups {
for k1,_ := range coreGroups[k] {
+ coreGroups[k][k1].cluster = "vcore"
coreGroups[k][k1].backend = "vcore"+strconv.Itoa(k+1)
coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
}
@@ -755,15 +825,28 @@
log.Info("Setting connections")
// Configure the backeds based on the calculated core groups
for _,v := range coreGroups {
- setConnection(client, v[0].backend, v[0].connection, v[0].ipAddr, 50057)
- setConnection(client, v[1].backend, v[1].connection, v[1].ipAddr, 50057)
+ setConnection(client, "vcore", v[0].backend, v[0].connection, v[0].ipAddr, 50057)
+ setConnection(client, "vcore", v[1].backend, v[1].connection, v[1].ipAddr, 50057)
+ }
+
+ // Process the read only pods
+ roPods := getVolthaPods(clientset, roCoreFltr)
+ for k,v := range roPods {
+ log.Debugf("Processing ro_pod %v", v)
+ vN := "ro_vcore"+strconv.Itoa(k+1)
+ log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
+ roPods[k].cluster = "ro_core"
+ roPods[k].backend = vN
+ roPods[k].connection = vN+"1"
+ setConnection(client, "ro_vcore", v.backend, v.connection, v.ipAddr, 50057)
}
log.Info("Starting discovery monitoring")
startDiscoveryMonitor(client, coreGroups)
log.Info("Starting core monitoring")
- startCoreMonitor(client, clientset, coreFltr, coreGroups) // Never returns
+ startCoreMonitor(client, clientset, rwCoreFltr,
+ roCoreFltr, coreGroups, roPods) // Never returns
return
}