Improvements to afrouterd.
- Added ability to run the afrouterd outside the cluster, using a URL & local kube-config instead of in-cluster config.
- Fixed a bug where the afrouterd would not wait for connectivity to the afrouter before starting its main loop.
(grpc.Dial is async by default.)
This may or may not resolve VOL-1661.
Change-Id: I58ba2ef52edb7f0eddcf7d7f2735f3b9d460237a
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index 47445ac..9f1215c 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -41,6 +41,7 @@
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/clientcmd"
)
type volthaPod struct {
@@ -63,6 +64,10 @@
}
var (
+ // if k8s variables are undefined, will attempt to use in-cluster config
+ k8sApiServer = getStrEnv("K8S_API_SERVER", "")
+ k8sKubeConfigPath = getStrEnv("K8S_KUBE_CONFIG_PATH", "")
+
podNamespace = getStrEnv("POD_NAMESPACE", "voltha")
podGrpcPort = uint64(getIntEnv("POD_GRPC_PORT", 0, math.MaxUint16, 50057))
@@ -121,15 +126,27 @@
}
func k8sClientSet() *kubernetes.Clientset {
- // creates the in-cluster config
- config, err := rest.InClusterConfig()
- if err != nil {
- panic(err.Error())
+ var config *rest.Config
+ if k8sApiServer != "" || k8sKubeConfigPath != "" {
+ // use combination of URL & local kube-config file
+ c, err := clientcmd.BuildConfigFromFlags(k8sApiServer, k8sKubeConfigPath)
+ if err != nil {
+ panic(err)
+ }
+ config = c
+ } else {
+ // use in-cluster config
+ c, err := rest.InClusterConfig()
+ if err != nil {
+ log.Errorf("Unable to load in-cluster config. Try setting K8S_API_SERVER and K8S_KUBE_CONFIG_PATH?")
+ panic(err)
+ }
+ config = c
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
- panic(err.Error())
+ panic(err)
}
return clientset
@@ -137,15 +154,18 @@
func connect(addr string) (*grpc.ClientConn, error) {
for ctr := 0; ctr < 100; ctr++ {
+ startTime := time.Now()
log.Debugf("Trying to connect to %s", addr)
- conn, err := grpc.Dial(addr, grpc.WithInsecure())
+ ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
+ conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
- log.Debugf("Attempt to connect failed, retrying %v:", err)
+ log.Debugf("Attempt to connect failed, retrying. (%v)", err)
} else {
log.Debugf("Connection succeeded")
return conn, err
}
- time.Sleep(10 * time.Second)
+ // 5s between attempts, whether or not if the connection fails immediately
+ time.Sleep(startTime.Add(time.Second * 5).Sub(time.Now()))
}
log.Debugf("Too many connection attempts, giving up!")
return nil, errors.New("Timeout attempting to conect")
@@ -156,7 +176,7 @@
pods, err := cs.CoreV1().Pods(podNamespace).List(metav1.ListOptions{})
if err != nil {
- panic(err.Error())
+ panic(err)
}
//log.Debugf("There are a total of %d pods in the cluster\n", len(pods.Items))
@@ -821,10 +841,10 @@
// Connect to the affinity router and set up the client
conn, err = connect("localhost:55554") // This is a sidecar container so communicating over localhost
- defer conn.Close()
if err != nil {
- panic(err.Error())
+ panic(err)
}
+ defer conn.Close()
client := pb.NewConfigurationClient(conn)
// Get the voltha rw-core podes