VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability
- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism
Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index 34ab711..937eefe 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -38,6 +38,7 @@
type KVPair struct {
Key string
Value interface{}
+ Version int64
Session string
Lease int64
}
@@ -47,12 +48,13 @@
}
// NewKVPair creates a new KVPair object
-func NewKVPair(key string, value interface{}, session string, lease int64) *KVPair {
+func NewKVPair(key string, value interface{}, session string, lease int64, version int64) *KVPair {
kv := new(KVPair)
kv.Key = key
kv.Value = value
kv.Session = session
kv.Lease = lease
+ kv.Version = version
return kv
}
@@ -61,14 +63,16 @@
EventType int
Key interface{}
Value interface{}
+ Version int64
}
// NewEvent creates a new Event object
-func NewEvent(eventType int, key interface{}, value interface{}) *Event {
+func NewEvent(eventType int, key interface{}, value interface{}, version int64) *Event {
evnt := new(Event)
evnt.EventType = eventType
evnt.Key = key
evnt.Value = value
+ evnt.Version = version
return evnt
}
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index 2d02342..4b25b5f 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -79,7 +79,7 @@
}
m := make(map[string]*KVPair)
for _, kvp := range kvps {
- m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0)
+ m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1)
}
return m, nil
}
@@ -100,7 +100,7 @@
return nil, err
}
if kvp != nil {
- return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0), nil
+ return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1), nil
}
return nil, nil
@@ -455,7 +455,7 @@
default:
if err != nil {
log.Warnw("error-from-watch", log.Fields{"error": err})
- ch <- NewEvent(CONNECTIONDOWN, key, []byte(""))
+ ch <- NewEvent(CONNECTIONDOWN, key, []byte(""), -1)
} else {
log.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
}
@@ -469,12 +469,12 @@
} else {
log.Debugw("update-received", log.Fields{"pair": pair})
if pair == nil {
- ch <- NewEvent(DELETE, key, []byte(""))
+ ch <- NewEvent(DELETE, key, []byte(""), -1)
} else if !c.isKVEqual(pair, previousKVPair) {
// Push the change onto the channel if the data has changed
// For now just assume it's a PUT change
log.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
- ch <- NewEvent(PUT, pair.Key, pair.Value)
+ ch <- NewEvent(PUT, pair.Key, pair.Value, -1)
}
previousKVPair = pair
lastIndex = meta.LastIndex
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 6935296..4f6f90b 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -74,7 +74,7 @@
}
m := make(map[string]*KVPair)
for _, ev := range resp.Kvs {
- m[string(ev.Key)] = NewKVPair(string(ev.Key), ev.Value, "", ev.Lease)
+ m[string(ev.Key)] = NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version)
}
return m, nil
}
@@ -94,7 +94,7 @@
}
for _, ev := range resp.Kvs {
// Only one value is returned
- return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease), nil
+ return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version), nil
}
return nil, nil
}
@@ -399,7 +399,7 @@
for resp := range channel {
for _, ev := range resp.Events {
//log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
- ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value)
+ ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value, ev.Kv.Version)
}
}
log.Debug("stop-listening-on-channel ...")