VOL-806: Envoy polls etcd for changes in core assignments
* Replaced polling loop with an etcd watch channel.
* Fixed a coordinator log message that sometimes causes
an exception.
Change-Id: I6e523b30724d2f1e953928b705fb847cb43dca4f
diff --git a/envoy/go/envoyd/envoyd.go b/envoy/go/envoyd/envoyd.go
index b1a4067..d4b8b05 100644
--- a/envoy/go/envoyd/envoyd.go
+++ b/envoy/go/envoyd/envoyd.go
@@ -526,35 +526,28 @@
func (ec * EnvoyControl) monitorEtcdKey() {
var err error
- // TODO: Check into the use of any data consistency options
- //
// Get the initial values of the assignment key which contains individual
// voltha core IP addresses. This may be empty until voltha populates it
// so it must be checked
- log.Printf("Monitoring etcd key")
- val, index, err := ec.readEtcdKey(ec.assignmentKey)
- lastIndex := index
- log.Printf("Starting Envoy, initial index = %d", lastIndex)
- ec.runEnvoy(val)
- for {
- for {
- time.Sleep(60 * time.Second)
- val, index, err = ec.readEtcdKey(ec.assignmentKey)
- if err != nil {
- log.Fatal("Unable to read assignment etcd key: %s\n", err.Error())
- } else if index != lastIndex {
- break
- } else {
- log.Println(string(val))
- log.Printf("Last index = %d", index)
- }
- }
- // Fell through, the index has changed thus the key has changed
- log.Printf("Starting Envoy")
+ log.Printf("Monitoring etcd key %s", ec.assignmentKey)
+ val, index, err := ec.readEtcdKey(ec.assignmentKey)
+ if err == nil {
+ lastIndex := index
+ log.Printf("Starting Envoy, initial index = %d", lastIndex)
ec.runEnvoy(val)
- log.Printf("Last index = %d", index)
- lastIndex = index
+ }
+
+ rch := ec.etcd.Watch(context.Background(), ec.assignmentKey)
+ for resp := range rch {
+ for _, ev := range resp.Events {
+ val = ev.Kv.Value
+ log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
+ if ev.Type == etcdapi.EventTypePut {
+ log.Printf("Starting Envoy")
+ ec.runEnvoy(val)
+ }
+ }
}
}