Afrouterd contextualization.
- Removed connect loop in favour of grpc-implemented backoff.
- Added context that is passed to all subprocesses, and canceled when the connection to the afrouter is lost.
- On afrouter re-connect, everything is stopped, and the system starts over from a clean slate.
Resolves VOL-1681.
Possibly resolves VOL-1655 & VOL-1661, please retest.
Change-Id: I92e16ac02b2ba209570d25ac407515d2df1c7b22
diff --git a/Gopkg.lock b/Gopkg.lock
index eaa2bb4..0749a6c 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -810,6 +810,7 @@
"google.golang.org/grpc/connectivity",
"google.golang.org/grpc/credentials",
"google.golang.org/grpc/grpclog",
+ "google.golang.org/grpc/keepalive",
"google.golang.org/grpc/metadata",
"google.golang.org/grpc/status",
"gopkg.in/Shopify/sarama.v1",
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 9f1215c..bfd7ba9 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -20,6 +20,8 @@
"errors"
"flag"
"fmt"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/keepalive"
"math"
"os"
"path"
@@ -74,6 +76,8 @@
numRWPods = getIntEnv("NUM_RW_PODS", 1, math.MaxInt32, 6)
numROPods = getIntEnv("NUM_RO_PODS", 1, math.MaxInt32, 3)
+ afrouterApiAddress = getStrEnv("AFROUTER_API_ADDRESS", "localhost:55554")
+
afrouterRouterName = getStrEnv("AFROUTER_ROUTER_NAME", "vcore")
afrouterRWClusterName = getStrEnv("AFROUTER_RW_CLUSTER_NAME", "vcore")
afrouterROClusterName = getStrEnv("AFROUTER_RO_CLUSTER_NAME", "ro_vcore")
@@ -152,34 +156,27 @@
return clientset
}
-func connect(addr string) (*grpc.ClientConn, error) {
- for ctr := 0; ctr < 100; ctr++ {
- startTime := time.Now()
- log.Debugf("Trying to connect to %s", addr)
- ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
- conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock())
- if err != nil {
- log.Debugf("Attempt to connect failed, retrying. (%v)", err)
- } else {
- log.Debugf("Connection succeeded")
- return conn, err
- }
- // 5s between attempts, whether or not if the connection fails immediately
- time.Sleep(startTime.Add(time.Second * 5).Sub(time.Now()))
+func connect(ctx context.Context, addr string) (*grpc.ClientConn, error) {
+ log.Debugf("Trying to connect to %s", addr)
+ conn, err := grpc.DialContext(ctx, addr,
+ grpc.WithInsecure(),
+ grpc.WithBlock(),
+ grpc.WithBackoffMaxDelay(time.Second*5),
+ grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: time.Second * 10, Timeout: time.Second * 5}))
+ if err == nil {
+ log.Debugf("Connection succeeded")
}
- log.Debugf("Too many connection attempts, giving up!")
- return nil, errors.New("Timeout attempting to conect")
+ return conn, err
}
-func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) []*volthaPod {
- var rtrn []*volthaPod
-
+func getVolthaPods(cs *kubernetes.Clientset, coreFilter *regexp.Regexp) ([]*volthaPod, error) {
pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
if err != nil {
- panic(err)
+ return nil, err
}
//log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
+ var rtrn []*volthaPod
for _, v := range pods.Items {
if coreFilter.MatchString(v.Name) {
log.Debugf("Namespace: %s, PodName: %s, PodIP: %s, Host: %s\n", v.Namespace, v.Name,
@@ -192,16 +189,17 @@
}
}
}
- return rtrn
+ return rtrn, nil
}
func reconcilePodDeviceIds(pod *volthaPod, ids map[string]struct{}) bool {
- conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
- defer conn.Close()
+ ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
+ conn, err := connect(ctx, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
if err != nil {
log.Debugf("Could not query devices from %s, could not connect", pod.name)
return false
}
+ defer conn.Close()
var idList cmn.IDs
for k := range ids {
@@ -221,7 +219,8 @@
func queryPodDeviceIds(pod *volthaPod) map[string]struct{} {
var rtrn = make(map[string]struct{})
// Open a connection to the pod
- conn, err := connect(fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
+ ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
+ conn, err := connect(ctx, fmt.Sprintf("%s:%d", pod.ipAddr, podGrpcPort))
if err != nil {
log.Debugf("Could not query devices from %s, could not connect", pod.name)
return rtrn
@@ -482,26 +481,26 @@
return false
}
-func setConnection(client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
+func setConnection(ctx context.Context, client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
log.Debugf("Configuring backend %s : connection %s in cluster %s\n\n",
backend, connection, cluster)
cnf := &pb.Conn{Server: "grpc_command", Cluster: cluster, Backend: backend,
Connection: connection, Addr: addr,
Port: port}
- if res, err := client.SetConnection(context.Background(), cnf); err != nil {
+ if res, err := client.SetConnection(ctx, cnf); err != nil {
log.Debugf("failed SetConnection RPC call: %s", err)
} else {
log.Debugf("Result: %v", res)
}
}
-func setAffinity(client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
+func setAffinity(ctx context.Context, client pb.ConfigurationClient, ids map[string]struct{}, backend string) {
log.Debugf("Configuring backend %s : affinities \n", backend)
aff := &pb.Affinity{Router: afrouterRouterName, Route: "dev_manager", Cluster: afrouterRWClusterName, Backend: backend}
for k := range ids {
log.Debugf("Setting affinity for id %s", k)
aff.Id = k
- if res, err := client.SetAffinity(context.Background(), aff); err != nil {
+ if res, err := client.SetAffinity(ctx, aff); err != nil {
log.Debugf("failed affinity RPC call: %s", err)
} else {
log.Debugf("Result: %v", res)
@@ -521,12 +520,17 @@
return ""
}
-func monitorDiscovery(client pb.ConfigurationClient,
+func monitorDiscovery(ctx context.Context,
+ client pb.ConfigurationClient,
ch <-chan *ic.InterContainerMessage,
- coreGroups [][]*volthaPod) {
+ coreGroups [][]*volthaPod,
+ doneCh chan<- struct{}) {
+ defer close(doneCh)
+
var id = make(map[string]struct{})
select {
+ case <-ctx.Done():
case msg := <-ch:
log.Debugf("Received a device discovery notification")
device := &ic.DeviceDiscovered{}
@@ -536,7 +540,7 @@
// Set the affinity of the discovered device.
if be := getBackendForCore(device.Id, coreGroups); be != "" {
id[device.Id] = struct{}{}
- setAffinity(client, id, be)
+ setAffinity(ctx, client, id, be)
} else {
log.Error("Cant use an empty string as a backend name")
}
@@ -545,20 +549,25 @@
}
}
-func startDiscoveryMonitor(client pb.ConfigurationClient,
- coreGroups [][]*volthaPod) error {
+func startDiscoveryMonitor(ctx context.Context,
+ client pb.ConfigurationClient,
+ coreGroups [][]*volthaPod) (<-chan struct{}, error) {
+ doneCh := make(chan struct{})
var ch <-chan *ic.InterContainerMessage
// Connect to kafka for discovery events
topic := &kafka.Topic{Name: kafkaTopic}
kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
kc.Start()
+ defer kc.Stop()
if ch, err = kc.Subscribe(topic); err != nil {
log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
- return err
+ close(doneCh)
+ return doneCh, err
}
- go monitorDiscovery(client, ch, coreGroups)
- return nil
+
+ go monitorDiscovery(ctx, client, ch, coreGroups, doneCh)
+ return doneCh, nil
}
// Determines which items in core groups
@@ -635,7 +644,7 @@
return coreGroupDiffs
}
-func applyAddrDiffs(client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
+func applyAddrDiffs(ctx context.Context, client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
var newEntries [][]*volthaPod
log.Debug("Applying diffs")
@@ -660,7 +669,7 @@
}
}
// Send the affininty router new connection information
- setConnection(client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
+ setConnection(ctx, client, afrouterRWClusterName, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, podGrpcPort)
// Copy the new entry information over
cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
cores[k1][k2].name = newEntries[k1][k2].name
@@ -700,7 +709,7 @@
}
mia[0].ipAddr = v1.ipAddr
mia[0].name = v1.name
- setConnection(client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
+ setConnection(ctx, client, afrouterROClusterName, mia[0].backend, mia[0].connection, v1.ipAddr, podGrpcPort)
// Now get rid of the mia entry just processed
mia = append(mia[:0], mia[1:]...)
}
@@ -724,12 +733,13 @@
}
}
-func startCoreMonitor(client pb.ConfigurationClient,
+func startCoreMonitor(ctx context.Context,
+ client pb.ConfigurationClient,
clientset *kubernetes.Clientset,
rwCoreFltr *regexp.Regexp,
roCoreFltr *regexp.Regexp,
coreGroups [][]*volthaPod,
- oRoPods []*volthaPod) error {
+ oRoPods []*volthaPod) {
// Now that initial allocation has been completed, monitor the pods
// for IP changes
// The main loop needs to do the following:
@@ -745,10 +755,22 @@
// If an IP address has changed (which shouldn't
// happen unless a pod is re-started) it should get
// caught by the pod name change.
+loop:
for {
- time.Sleep(10 * time.Second) // Wait a while
+ select {
+ case <-ctx.Done():
+ // if we're done, exit
+ break loop
+ case <-time.After(10 * time.Second): //wait a while
+ }
+
// Get the rw core list from k8s
- rwPods := getVolthaPods(clientset, rwCoreFltr)
+ rwPods, err := getVolthaPods(clientset, rwCoreFltr)
+ if err != nil {
+ log.Error(err)
+ continue
+ }
+
queryDeviceIds(rwPods)
updateDeviceIds(coreGroups, rwPods)
// If we didn't get 2n+1 pods then wait since
@@ -762,23 +784,26 @@
for _, v := range rwPods {
if !hasIpAddr(coreGroups, v.ipAddr) {
log.Debug("Address has changed...")
- applyAddrDiffs(client, coreGroups, rwPods)
+ applyAddrDiffs(ctx, client, coreGroups, rwPods)
break
}
}
- roPods := getVolthaPods(clientset, roCoreFltr)
+ roPods, err := getVolthaPods(clientset, roCoreFltr)
+ if err != nil {
+ log.Error(err)
+ continue
+ }
if len(roPods) != numROPods {
continue
}
for _, v := range roPods {
if !hasIpAddr(oRoPods, v.ipAddr) {
- applyAddrDiffs(client, oRoPods, roPods)
+ applyAddrDiffs(ctx, client, oRoPods, roPods)
break
}
}
-
}
}
@@ -804,19 +829,27 @@
return false
}
-func main() {
- // This is currently hard coded to a cluster with 3 servers
- //var connections map[string]configConn = make(map[string]configConn)
- //var rwCorePodsPrev map[string]rwPod = make(map[string]rwPod)
- var err error
- var conn *grpc.ClientConn
+// endOnClose cancels the context when the connection closes
+func connectionActiveContext(conn *grpc.ClientConn) context.Context {
+ ctx, disconnected := context.WithCancel(context.Background())
+ go func() {
+ for state := conn.GetState(); state != connectivity.TransientFailure && state != connectivity.Shutdown; state = conn.GetState() {
+ if !conn.WaitForStateChange(context.Background(), state) {
+ break
+ }
+ }
+ log.Infof("Connection to afrouter lost")
+ disconnected()
+ }()
+ return ctx
+}
+func main() {
config := &Configuration{}
cmdParse := flag.NewFlagSet(path.Base(os.Args[0]), flag.ContinueOnError)
config.DisplayVersionOnly = cmdParse.Bool("version", false, "Print version information and exit")
- err = cmdParse.Parse(os.Args[1:])
- if err != nil {
+ if err := cmdParse.Parse(os.Args[1:]); err != nil {
fmt.Printf("Error: %v\n", err)
os.Exit(1)
}
@@ -827,10 +860,6 @@
return
}
- // Set up the regular expression to identify the voltha cores
- rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
- roCoreFltr := regexp.MustCompile(`ro-core-`)
-
// Set up logging
if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
@@ -839,16 +868,38 @@
// Set up kubernetes api
clientset := k8sClientSet()
- // Connect to the affinity router and set up the client
- conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
+ for {
+ // Connect to the affinity router
+ conn, err := connect(context.Background(), afrouterApiAddress) // This is a sidecar container so communicating over localhost
+ if err != nil {
+ panic(err)
+ }
+
+ // monitor the connection status, end context if connection is lost
+ ctx := connectionActiveContext(conn)
+
+ // set up the client
+ client := pb.NewConfigurationClient(conn)
+
+ // determine config & repopulate the afrouter
+ generateAndMaintainConfiguration(ctx, client, clientset)
+
+ conn.Close()
+ }
+}
+
+// generateAndMaintainConfiguration does the pod-reconciliation work,
+// it only returns once all sub-processes have completed
+func generateAndMaintainConfiguration(ctx context.Context, client pb.ConfigurationClient, clientset *kubernetes.Clientset) {
+ // Set up the regular expression to identify the voltha cores
+ rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
+ roCoreFltr := regexp.MustCompile(`ro-core-`)
+
+ // Get the voltha rw-core podes
+ rwPods, err := getVolthaPods(clientset, rwCoreFltr)
if err != nil {
panic(err)
}
- defer conn.Close()
- client := pb.NewConfigurationClient(conn)
-
- // Get the voltha rw-core podes
- rwPods := getVolthaPods(clientset, rwCoreFltr)
// Fetch the devices held by each running core
queryDeviceIds(rwPods)
@@ -879,18 +930,21 @@
log.Info("Setting affinities")
// Now set the affinities for exising devices in the cores
for _, v := range coreGroups {
- setAffinity(client, v[0].devIds, v[0].backend)
- setAffinity(client, v[1].devIds, v[1].backend)
+ setAffinity(ctx, client, v[0].devIds, v[0].backend)
+ setAffinity(ctx, client, v[1].devIds, v[1].backend)
}
log.Info("Setting connections")
// Configure the backeds based on the calculated core groups
for _, v := range coreGroups {
- setConnection(client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
- setConnection(client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
+ setConnection(ctx, client, afrouterRWClusterName, v[0].backend, v[0].connection, v[0].ipAddr, podGrpcPort)
+ setConnection(ctx, client, afrouterRWClusterName, v[1].backend, v[1].connection, v[1].ipAddr, podGrpcPort)
}
// Process the read only pods
- roPods := getVolthaPods(clientset, roCoreFltr)
+ roPods, err := getVolthaPods(clientset, roCoreFltr)
+ if err != nil {
+ panic(err)
+ }
for k, v := range roPods {
log.Debugf("Processing ro_pod %v", v)
vN := afrouterROClusterName + strconv.Itoa(k+1)
@@ -898,14 +952,15 @@
roPods[k].cluster = afrouterROClusterName
roPods[k].backend = vN
roPods[k].connection = vN + "1"
- setConnection(client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
+ setConnection(ctx, client, afrouterROClusterName, v.backend, v.connection, v.ipAddr, podGrpcPort)
}
log.Info("Starting discovery monitoring")
- startDiscoveryMonitor(client, coreGroups)
+ doneCh, _ := startDiscoveryMonitor(ctx, client, coreGroups)
log.Info("Starting core monitoring")
- startCoreMonitor(client, clientset, rwCoreFltr,
- roCoreFltr, coreGroups, roPods) // Never returns
- return
+ startCoreMonitor(ctx, client, clientset, rwCoreFltr, roCoreFltr, coreGroups, roPods)
+
+ //ensure the discovery monitor to quit
+ <-doneCh
}