Changed algorithm to wait for all containers to be running.
- Extra wait loop on startup, to wait for rw/ro pods to come online.
- Removed crashed pods from consideration (more stringent checks so this actually works).
- Now passing context to southbound requests where it was missed before.
Change-Id: I667e17c324282e0af87e8282195eb6632c3424ab
diff --git a/Gopkg.lock b/Gopkg.lock
index 407375c..3105662 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -814,6 +814,7 @@
"google.golang.org/grpc/metadata",
"google.golang.org/grpc/status",
"gopkg.in/Shopify/sarama.v1",
+ "k8s.io/api/core/v1",
"k8s.io/apimachinery/pkg/apis/meta/v1",
"k8s.io/client-go/kubernetes",
"k8s.io/client-go/rest",
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index bfd7ba9..3516261 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -22,6 +22,7 @@
"fmt"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/keepalive"
+ "k8s.io/api/core/v1"
"math"
"os"
"path"
@@ -169,32 +170,42 @@
return conn, err
}
-func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) ([]*volthaPod, error) {
+func getVolthaPods(cs *kubernetes.Clientset) ([]*volthaPod, []*volthaPod, error) {
pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
if err != nil {
- return nil, err
+ return nil, nil, err
}
- //log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
- var rtrn []*volthaPod
+ // Set up the regular expression to identify the voltha cores
+ rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
+ roCoreFltr := regexp.MustCompile(`ro-core-`)
+
+ var rwPods, roPods []*volthaPod
+items:
for _, v := range pods.Items {
- if coreFilter.MatchString(v.Name) {
- log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
- v.Status.PodIP, v.Spec.NodeName)
- // Only add the pod if it has an IP address. If it doesn't then it likely crashed and
- // and is still in the process of getting re-started.
- if v.Status.PodIP != "" {
- rtrn = append(rtrn, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName,
- devIds: make(map[string]struct{}), backend: "", connection: ""})
+ // only pods that are actually running should be considered
+ if v.Status.Phase == v1.PodRunning {
+ for _, condition := range v.Status.Conditions {
+ if condition.Status != v1.ConditionTrue {
+ continue items
+ }
+ }
+
+ if rwCoreFltr.MatchString(v.Name) {
+ log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name, v.Status.PodIP, v.Spec.NodeName)
+ rwPods = append(rwPods, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName, devIds: make(map[string]struct{}), backend: "", connection: ""})
+ } else if roCoreFltr.MatchString(v.Name) {
+ log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name, v.Status.PodIP, v.Spec.NodeName)
+ roPods = append(roPods, &volthaPod{name: v.Name, ipAddr: v.Status.PodIP, node: v.Spec.NodeName, devIds: make(map[string]struct{}), backend: "", connection: ""})
}
}
}
- return rtrn, nil
+ return rwPods, roPods, nil
}
-func reconcilePodDeviceIds(pod *volthaPod, ids map[string]struct{}) bool {
- ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
- conn, err := connect(ctx, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
+func reconcilePodDeviceIds(ctx context.Context, pod *volthaPod, ids map[string]struct{}) bool {
+ ctxTimeout, _ := context.WithTimeout(ctx, time.Second*5)
+ conn, err := connect(ctxTimeout, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
if err != nil {
log.Debugf("Could not query devices from %s, could not connect", pod.name)
return false
@@ -207,7 +218,7 @@
}
client := vpb.NewVolthaServiceClient(conn)
- _, err = client.ReconcileDevices(context.Background(), &idList)
+ _, err = client.ReconcileDevices(ctx, &idList)
if err != nil {
log.Error(err)
return false
@@ -216,18 +227,18 @@
return true
}
-func queryPodDeviceIds(pod *volthaPod) map[string]struct{} {
+func queryPodDeviceIds(ctx context.Context, pod *volthaPod) map[string]struct{} {
var rtrn = make(map[string]struct{})
// Open a connection to the pod
- ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
- conn, err := connect(ctx, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
+ ctxTimeout, _ := context.WithTimeout(ctx, time.Second*5)
+ conn, err := connect(ctxTimeout, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
if err != nil {
log.Debugf("Could not query devices from %s, could not connect", pod.name)
return rtrn
}
defer conn.Close()
client := vpb.NewVolthaServiceClient(conn)
- devs, err := client.ListDeviceIds(context.Background(), &empty.Empty{})
+ devs, err := client.ListDeviceIds(ctx, &empty.Empty{})
if err != nil {
log.Error(err)
return rtrn
@@ -239,10 +250,10 @@
return rtrn
}
-func queryDeviceIds(pods []*volthaPod) {
+func queryDeviceIds(ctx context.Context, pods []*volthaPod) {
for pk := range pods {
// Keep the old Id list if a new list is not returned
- if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
+ if idList := queryPodDeviceIds(ctx, pods[pk]); len(idList) != 0 {
pods[pk].devIds = idList
}
}
@@ -645,12 +656,10 @@
}
func applyAddrDiffs(ctx context.Context, client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
- var newEntries [][]*volthaPod
-
log.Debug("Applying diffs")
switch cores := coreList.(type) {
case [][]*volthaPod:
- newEntries = reconcileAddrDiffs(getAddrDiffs(cores, nPods))
+ newEntries := reconcileAddrDiffs(getAddrDiffs(cores, nPods))
// Now replace the information in coreGropus with the new
// entries and then reconcile the device ids on the core
@@ -662,9 +671,9 @@
// TODO: Missing is the case where bothe the primary
// and the secondary core crash and come back.
// Pull the device ids from the active-active peer
- ids := queryPodDeviceIds(cores[k1][k2^1])
+ ids := queryPodDeviceIds(ctx, cores[k1][k2^1])
if len(ids) != 0 {
- if !reconcilePodDeviceIds(newEntries[k1][k2], ids) {
+ if !reconcilePodDeviceIds(ctx, newEntries[k1][k2], ids) {
log.Errorf("Attempt to reconcile ids on pod %v failed", newEntries[k1][k2])
}
}
@@ -719,16 +728,17 @@
}
func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
- var byName = make(map[string]*volthaPod)
-
// Convenience
+ var byName = make(map[string]*volthaPod)
for _, v := range rwPods {
byName[v.name] = v
}
for k1, v1 := range coreGroups {
- for k2 := range v1 {
- coreGroups[k1][k2].devIds = byName[v1[k2].name].devIds
+ for k2, v2 := range v1 {
+ if pod, have := byName[v2.name]; have {
+ coreGroups[k1][k2].devIds = pod.devIds
+ }
}
}
}
@@ -736,8 +746,6 @@
func startCoreMonitor(ctx context.Context,
client pb.ConfigurationClient,
clientset *kubernetes.Clientset,
- rwCoreFltr *regexp.Regexp,
- roCoreFltr *regexp.Regexp,
coreGroups [][]*volthaPod,
oRoPods []*volthaPod) {
// Now that initial allocation has been completed, monitor the pods
@@ -765,20 +773,23 @@
}
// Get the rw core list from k8s
- rwPods, err := getVolthaPods(clientset, rwCoreFltr)
+ rwPods, roPods, err := getVolthaPods(clientset)
if err != nil {
log.Error(err)
continue
}
- queryDeviceIds(rwPods)
- updateDeviceIds(coreGroups, rwPods)
// If we didn't get 2n+1 pods then wait since
// something is down and will hopefully come
// back up at some point.
if len(rwPods) != numRWPods {
+ log.Debug("One or more RW pod(s) are offline, will wait and retry")
continue
}
+
+ queryDeviceIds(ctx, rwPods)
+ updateDeviceIds(coreGroups, rwPods)
+
// We have all pods, check if any IP addresses
// have changed.
for _, v := range rwPods {
@@ -789,13 +800,8 @@
}
}
- roPods, err := getVolthaPods(clientset, roCoreFltr)
- if err != nil {
- log.Error(err)
- continue
- }
-
if len(roPods) != numROPods {
+ log.Debug("One or more RO pod(s) are offline, will wait and retry")
continue
}
for _, v := range roPods {
@@ -891,18 +897,30 @@
// generateAndMaintainConfiguration does the pod-reconciliation work,
// it only returns once all sub-processes have completed
func generateAndMaintainConfiguration(ctx context.Context, client pb.ConfigurationClient, clientset *kubernetes.Clientset) {
- // Set up the regular expression to identify the voltha cores
- rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
- roCoreFltr := regexp.MustCompile(`ro-core-`)
+ // Get the voltha rw-/ro-core pods
+ var rwPods, roPods []*volthaPod
+ for {
+ var err error
+ if rwPods, roPods, err = getVolthaPods(clientset); err != nil {
+ log.Error(err)
+ return
+ }
- // Get the voltha rw-core podes
- rwPods, err := getVolthaPods(clientset, rwCoreFltr)
- if err != nil {
- panic(err)
+ if len(rwPods) == numRWPods && len(roPods) == numROPods {
+ break
+ }
+
+ log.Debug("One or more RW/RO pod(s) are offline, will wait and retry")
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(time.Second * 5):
+ // retry
+ }
}
// Fetch the devices held by each running core
- queryDeviceIds(rwPods)
+ queryDeviceIds(ctx, rwPods)
// For debugging... comment out l8r
for _, v := range rwPods {
@@ -912,8 +930,8 @@
coreGroups := groupPods1(rwPods)
// Assign the groupings to the the backends and connections
- for k := range coreGroups {
- for k1 := range coreGroups[k] {
+ for k, coresInGroup := range coreGroups {
+ for k1 := range coresInGroup {
coreGroups[k][k1].cluster = afrouterRWClusterName
coreGroups[k][k1].backend = afrouterRWClusterName + strconv.Itoa(k+1)
coreGroups[k][k1].connection = afrouterRWClusterName + strconv.Itoa(k+1) + strconv.Itoa(k1+1)
@@ -941,10 +959,6 @@
}
// Process the read only pods
- roPods, err := getVolthaPods(clientset, roCoreFltr)
- if err != nil {
- panic(err)
- }
for k, v := range roPods {
log.Debugf("Processing ro_pod %v", v)
vN := afrouterROClusterName + strconv.Itoa(k+1)
@@ -959,7 +973,7 @@
doneCh, _ := startDiscoveryMonitor(ctx, client, coreGroups)
log.Info("Starting core monitoring")
- startCoreMonitor(ctx, client, clientset, rwCoreFltr, roCoreFltr, coreGroups, roPods)
+ startCoreMonitor(ctx, client, clientset, coreGroups, roPods)
//ensure the discovery monitor to quit
<-doneCh