VOL-1690: Close the etcd watcher channels after we are done watching for etcd events.
Also ensure that the etcd watcher context is canceled after we are done using the etcd Watcher interface.

Change-Id: I237d7e7f2c2d05c5998d26560ff9abea653e04a2
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index f19f365..1730173 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -308,7 +308,8 @@
 // listen to receive Events.
 func (c *EtcdClient) Watch(key string) chan *Event {
 	w := v3Client.NewWatcher(c.ectdAPI)
-	channel := w.Watch(context.Background(), key, v3Client.WithPrefix())
+	ctx, cancel := context.WithCancel(context.Background())
+	channel := w.Watch(ctx, key, v3Client.WithPrefix())
 
 	// Create a new channel
 	ch := make(chan *Event, maxClientChannelBufferSize)
@@ -325,7 +326,7 @@
 	// json format.
 	log.Debugw("watched-channels", log.Fields{"len": len(channelMaps)})
 	// Launch a go routine to listen for updates
-	go c.listenForKeyChange(channel, ch)
+	go c.listenForKeyChange(channel, ch, cancel)
 
 	return ch
 
@@ -406,8 +407,9 @@
 	log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
 }
 
-func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
+func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event, cancel context.CancelFunc) {
 	log.Debug("start-listening-on-channel ...")
+	defer cancel()
 	for resp := range channel {
 		for _, ev := range resp.Events {
 			//log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index b2d01a4..11f3d4e 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -247,6 +247,7 @@
 	var res int
 
 	events := ctx.kvClient.Watch(c.txnKey)
+	defer ctx.kvClient.CloseWatch(c.txnKey, events)
 
 	for {
 		select {