[VOL-1416]
THis update fixes the problem described by the Jira above.
The affinity router's control plane for voltha now correctly
detects dynamic state changes.
Change-Id: I302ea65eb4f3618ae3cbcca7cd813d0b6cf4de50
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 06f4628..fe9a352 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -37,6 +37,7 @@
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
empty "github.com/golang/protobuf/ptypes/empty"
vpb "github.com/opencord/voltha-go/protos/voltha"
+ cmn "github.com/opencord/voltha-go/protos/common"
pb "github.com/opencord/voltha-go/protos/afrouter"
ic "github.com/opencord/voltha-go/protos/inter_container"
)
@@ -68,6 +69,8 @@
dn bool
}
+var nPods int = 6
+
// Topic is affinityRouter
// port: 9092
@@ -112,7 +115,7 @@
func connect(addr string) (*grpc.ClientConn, error) {
for ctr :=0 ; ctr < 100; ctr++ {
- log.Debug("Trying to connect to %s", addr)
+ log.Debugf("Trying to connect to %s", addr)
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
log.Debugf("Attempt to connect failed, retrying %v:", err)
@@ -133,42 +136,73 @@
if err != nil {
panic(err.Error())
}
- log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
+ //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
- for k,v := range pods.Items {
+ for _,v := range pods.Items {
if v.Namespace == "voltha" && coreFilter.MatchString(v.Name) {
- fmt.Printf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
+ log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
v.Status.PodIP, v.Spec.NodeName)
- //fmt.Printf("Pod %v,%v\n\n\n",k,v)
- _ = k
- // Add this pod to the core structure.
- rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
- devIds:make(map[string]struct{}), backend:"", connection:""})
+ // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
+ // and is still in the process of getting re-started.
+ if v.Status.PodIP != "" {
+ rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
+ devIds:make(map[string]struct{}), backend:"", connection:""})
+ }
}
}
return rtrn
}
-func queryDevices(pods []*rwPod) {
- for pk,pv := range pods {
- // Open a connection to the pod
- // port 50057
- conn, err := connect(pv.ipAddr+":50057")
- if (err != nil) {
- log.Debugf("Could not query devices from %s, could not connect", pv.name)
- continue
+func reconcilePodDeviceIds(pod * rwPod, ids map[string]struct{}) bool {
+ var idList cmn.IDs
+ for k,_ := range(ids) {
+ idList.Items = append(idList.Items, &cmn.ID{Id:k})
+ }
+ conn,err := connect(pod.ipAddr+":50057")
+ defer conn.Close()
+ if (err != nil) {
+ log.Debugf("Could not query devices from %s, could not connect", pod.name)
+ return false
+ }
+ client := vpb.NewVolthaServiceClient(conn)
+ _,err = client.ReconcileDevices(context.Background(), &idList)
+ if err != nil {
+ log.Error(err)
+ return false
+ }
+
+ return true
+}
+
+func queryPodDeviceIds(pod * rwPod) map[string]struct{} {
+ var rtrn map[string]struct{} = make(map[string]struct{})
+ // Open a connection to the pod
+ // port 50057
+ conn, err := connect(pod.ipAddr+":50057")
+ if (err != nil) {
+ log.Debugf("Could not query devices from %s, could not connect", pod.name)
+ return rtrn
+ }
+ defer conn.Close()
+ client := vpb.NewVolthaServiceClient(conn)
+ devs,err := client.ListDeviceIds(context.Background(), &empty.Empty{})
+ if err != nil {
+ log.Error(err)
+ return rtrn
+ }
+ for _,dv := range devs.Items {
+ rtrn[dv.Id]=struct{}{}
+ }
+
+ return rtrn
+}
+
+func queryDeviceIds(pods []*rwPod) {
+ for pk,_ := range pods {
+ // Keep the old Id list if a new list is not returned
+ if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
+ pods[pk].devIds = idList
}
- client := vpb.NewVolthaServiceClient(conn)
- devs,err := client.ListDevices(context.Background(), &empty.Empty{})
- if err != nil {
- log.Error(err)
- conn.Close()
- continue
- }
- for _,dv := range devs.Items {
- pods[pk].devIds[dv.Id]=struct{}{}
- }
- conn.Close()
}
}
@@ -723,6 +757,119 @@
return nil
}
+func deepCopyCoreGroups(coreGroups [][]*rwPod) ([][]*rwPod) {
+ var rtrn [][]*rwPod
+ return rtrn
+}
+
+
+// Determines which items in core groups
+// have changed based on the list provided
+// and returns a coreGroup with only the changed
+// items and a pod list with the new items
+func getAddrDiffs(coreGroups [][]*rwPod, rwPods []*rwPod) ([][]*rwPod, []*rwPod) {
+ var nList []*rwPod
+ var rtrn [][]*rwPod = make([][]*rwPod, nPods>>1)
+ var ipAddrs map[string]struct{} = make(map[string]struct{})
+
+ log.Debug("Get addr diffs")
+
+ // Start with an empty array
+ for k,_ := range(rtrn) {
+ rtrn[k] = make([]*rwPod, 2)
+ }
+
+ // Build a list with only the new items
+ for _,v := range(rwPods) {
+ if hasIpAddr(coreGroups, v.ipAddr) == false {
+ nList = append(nList, v)
+ }
+ ipAddrs[v.ipAddr] = struct{}{} // for the search below
+ }
+
+ // Now build the coreGroups with only the changed items
+ for k1,v1 := range(coreGroups) {
+ for k2,v2 := range(v1) {
+ if _,ok := ipAddrs[v2.ipAddr]; ok == false {
+ rtrn[k1][k2] = v2
+ }
+ }
+ }
+ return rtrn, nList
+}
+
+// Figure out where best to put the new pods
+// in the coreGroup array based on the old
+// pods being replaced. The criteria is that
+// the new pod be on the same server as the
+// old pod was.
+func reconcileAddrDiffs(coreGroupDiffs [][]*rwPod, rwPodDiffs []*rwPod) ([][]*rwPod) {
+ var srvrs map[string][]*rwPod = make(map[string][]*rwPod)
+
+ log.Debug("Reconciling diffs")
+ log.Debug("Building server list")
+ for _,v := range(rwPodDiffs) {
+ log.Debugf("Adding %v to the server list", *v)
+ srvrs[v.node] = append(srvrs[v.node], v)
+ }
+
+ for k1,v1 := range(coreGroupDiffs) {
+ log.Debugf("k1:%v, v1:%v", k1,v1)
+ for k2,v2 := range(v1) {
+ log.Debugf("k2:%v, v2:%v", k2,v2)
+ if v2 == nil { // Nothing to do here
+ continue
+ }
+ if _,ok := srvrs[v2.node]; ok == true {
+ coreGroupDiffs[k1][k2] = srvrs[v2.node][0]
+ if len(srvrs[v2.node]) > 1 { // remove one entry from the list
+ srvrs[v2.node] = append(srvrs[v2.node][:0], srvrs[v2.node][1:]...)
+ } else { // Delete the endtry from the map
+ delete(srvrs, v2.node)
+ }
+ } else {
+ log.Error("This should never happen, node appears to have changed names")
+ // attempt to limp along by keeping this old entry
+ }
+ }
+ }
+
+ return coreGroupDiffs
+}
+
+func applyAddrDiffs(client pb.ConfigurationClient, coreGroups [][]*rwPod, rwPods []*rwPod) {
+ var newEntries [][]*rwPod
+
+ log.Debug("Applying diffs")
+ newEntries = reconcileAddrDiffs(getAddrDiffs(coreGroups, rwPods))
+
+ // Now replace the information in coreGropus with the new
+ // entries and then reconcile the device ids on the core
+ // that's in the new entry with the device ids of it's
+ // active-active peer.
+ for k1,v1 := range(coreGroups) {
+ for k2,v2 := range(v1) {
+ if newEntries[k1][k2] != nil {
+ // TODO: Missing is the case where bothe the primary
+ // and the secondary core crash and come back.
+ // Pull the device ids from the active-active peer
+ ids := queryPodDeviceIds(coreGroups[k1][k2^1])
+ if len(ids) != 0 {
+ if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
+ log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
+ }
+ }
+ // Send the affininty router new connection information
+ setConnection(client, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
+ // Copy the new entry information over
+ coreGroups[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
+ coreGroups[k1][k2].name = newEntries[k1][k2].name
+ coreGroups[k1][k2].devIds = ids
+ }
+ }
+ }
+}
+
func startCoreMonitor(client pb.ConfigurationClient,
clientset *kubernetes.Clientset,
coreFltr *regexp.Regexp,
@@ -746,6 +893,7 @@
time.Sleep(10 * time.Second) // Wait a while
// Get the rw core list from k8s
rwPods := getRwPods(clientset, coreFltr)
+ queryDeviceIds(rwPods)
// If we didn't get 2n+1 pods then wait since
// something is down and will hopefully come
// back up at some point.
@@ -756,16 +904,27 @@
// We have all pods, check if any IP addresses
// have changed.
for _,v := range rwPods {
- //if hasIpAddr(coreGroups, v.ipAddr) == false {
- //log.Debug("Address has changed...")
- //applyAddrDiffs(coreGroups, rwPods)
- //}
- _ = v
+ if hasIpAddr(coreGroups, v.ipAddr) == false {
+ log.Debug("Address has changed...")
+ applyAddrDiffs(client, coreGroups, rwPods)
+
+ }
}
}
-
}
+func hasIpAddr(coreGroups [][]*rwPod, ipAddr string) bool {
+ for _,v1 := range(coreGroups) {
+ for _,v2 := range(v1) {
+ if v2.ipAddr == ipAddr {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+
func main() {
// This is currently hard coded to a cluster with 3 servers
//var connections map[string]configConn = make(map[string]configConn)
@@ -798,7 +957,7 @@
rwPods := getRwPods(clientset, coreFltr)
// Fetch the devices held by each running core
- queryDevices(rwPods)
+ queryDeviceIds(rwPods)
// For debugging... comment out l8r
for _,v := range rwPods {