VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
index 3021a31..041520d 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
@@ -40,7 +40,7 @@
db dbWrapper
idGen idGenerator
// EntryPerPage defines the number of channelz entries to be shown on a web page.
- EntryPerPage = 50
+ EntryPerPage = int64(50)
curState int32
maxTraceEntry = defaultMaxTraceEntry
)
@@ -113,20 +113,20 @@
// boolean indicating whether there's more top channels to be queried for.
//
// The arg id specifies that only top channel with id at or above it will be included
-// in the result. The returned slice is up to a length of EntryPerPage, and is
-// sorted in ascending id order.
-func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
- return db.get().GetTopChannels(id)
+// in the result. The returned slice is up to a length of the arg maxResults or
+// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
+func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
+ return db.get().GetTopChannels(id, maxResults)
}
// GetServers returns a slice of server's ServerMetric, along with a
// boolean indicating whether there's more servers to be queried for.
//
// The arg id specifies that only server with id at or above it will be included
-// in the result. The returned slice is up to a length of EntryPerPage, and is
-// sorted in ascending id order.
-func GetServers(id int64) ([]*ServerMetric, bool) {
- return db.get().GetServers(id)
+// in the result. The returned slice is up to a length of the arg maxResults or
+// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
+func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
+ return db.get().GetServers(id, maxResults)
}
// GetServerSockets returns a slice of server's (identified by id) normal socket's
@@ -134,10 +134,10 @@
// be queried for.
//
// The arg startID specifies that only sockets with id at or above it will be
-// included in the result. The returned slice is up to a length of EntryPerPage,
-// and is sorted in ascending id order.
-func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
- return db.get().GetServerSockets(id, startID)
+// included in the result. The returned slice is up to a length of the arg maxResults
+// or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
+func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
+ return db.get().GetServerSockets(id, startID, maxResults)
}
// GetChannel returns the ChannelMetric for the channel (identified by id).
@@ -155,6 +155,11 @@
return db.get().GetSocket(id)
}
+// GetServer returns the ServerMetric for the server (identified by id).
+func GetServer(id int64) *ServerMetric {
+ return db.get().GetServer(id)
+}
+
// RegisterChannel registers the given channel c in channelz database with ref
// as its reference name, and add it to the child list of its parent (identified
// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
@@ -447,29 +452,32 @@
return n
}
-func min(a, b int) int {
+func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
-func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
+func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
+ if maxResults <= 0 {
+ maxResults = EntryPerPage
+ }
c.mu.RLock()
- l := len(c.topLevelChannels)
+ l := int64(len(c.topLevelChannels))
ids := make([]int64, 0, l)
- cns := make([]*channel, 0, min(l, EntryPerPage))
+ cns := make([]*channel, 0, min(l, maxResults))
for k := range c.topLevelChannels {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
- count := 0
+ count := int64(0)
var end bool
var t []*ChannelMetric
for i, v := range ids[idx:] {
- if count == EntryPerPage {
+ if count == maxResults {
break
}
if cn, ok := c.channels[v]; ok {
@@ -499,21 +507,24 @@
return t, end
}
-func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
+func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
+ if maxResults <= 0 {
+ maxResults = EntryPerPage
+ }
c.mu.RLock()
- l := len(c.servers)
+ l := int64(len(c.servers))
ids := make([]int64, 0, l)
- ss := make([]*server, 0, min(l, EntryPerPage))
+ ss := make([]*server, 0, min(l, maxResults))
for k := range c.servers {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
- count := 0
+ count := int64(0)
var end bool
var s []*ServerMetric
for i, v := range ids[idx:] {
- if count == EntryPerPage {
+ if count == maxResults {
break
}
if svr, ok := c.servers[v]; ok {
@@ -541,7 +552,10 @@
return s, end
}
-func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
+func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
+ if maxResults <= 0 {
+ maxResults = EntryPerPage
+ }
var svr *server
var ok bool
c.mu.RLock()
@@ -551,18 +565,18 @@
return nil, true
}
svrskts := svr.sockets
- l := len(svrskts)
+ l := int64(len(svrskts))
ids := make([]int64, 0, l)
- sks := make([]*normalSocket, 0, min(l, EntryPerPage))
+ sks := make([]*normalSocket, 0, min(l, maxResults))
for k := range svrskts {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
- count := 0
+ count := int64(0)
var end bool
for i, v := range ids[idx:] {
- if count == EntryPerPage {
+ if count == maxResults {
break
}
if ns, ok := c.normalSockets[v]; ok {
@@ -655,6 +669,23 @@
return nil
}
+func (c *channelMap) GetServer(id int64) *ServerMetric {
+ sm := &ServerMetric{}
+ var svr *server
+ var ok bool
+ c.mu.RLock()
+ if svr, ok = c.servers[id]; !ok {
+ c.mu.RUnlock()
+ return nil
+ }
+ sm.ListenSockets = copyMap(svr.listenSockets)
+ c.mu.RUnlock()
+ sm.ID = svr.id
+ sm.RefName = svr.refName
+ sm.ServerData = svr.s.ChannelzMetric()
+ return sm
+}
+
type idGenerator struct {
id int64
}
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index a3e02b6..d2193b3 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -34,7 +34,7 @@
type RequireHandshakeSetting int
const (
- // RequireHandshakeHybrid (default, deprecated) indicates to wait for
+ // RequireHandshakeHybrid (default, deprecated) indicates to not wait for
// handshake before considering a connection ready, but wait before
// considering successful.
RequireHandshakeHybrid RequireHandshakeSetting = iota
@@ -59,6 +59,7 @@
func init() {
switch strings.ToLower(os.Getenv(requireHandshakeStr)) {
case "on":
+ default:
RequireHandshake = RequireHandshakeOn
case "off":
RequireHandshake = RequireHandshakeOff
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index f8932b1..eaa54d4 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -23,14 +23,21 @@
import "context"
var (
- // WithContextDialer is exported by clientconn.go
+ // WithContextDialer is exported by dialoptions.go
WithContextDialer interface{} // func(context.Context, string) (net.Conn, error) grpc.DialOption
- // WithResolverBuilder is exported by clientconn.go
+ // WithResolverBuilder is exported by dialoptions.go
WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
+ // WithHealthCheckFunc is not exported by dialoptions.go
+ WithHealthCheckFunc interface{} // func (HealthChecker) DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
- HealthCheckFunc func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
+ HealthCheckFunc HealthChecker
+ // BalancerUnregister is exported by package balancer to unregister a balancer.
+ BalancerUnregister func(name string)
)
+// HealthChecker defines the signature of the client-side LB channel health checking function.
+type HealthChecker func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
+
const (
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.
CredsBundleModeFallback = "fallback"
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 39208b1..babcaee 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -91,10 +91,10 @@
maxSendHeaderListSize *uint32
bdpEst *bdpEstimator
- // onSuccess is a callback that client transport calls upon
+ // onPrefaceReceipt is a callback that client transport calls upon
// receiving server preface to signal that a succefull HTTP2
// connection was established.
- onSuccess func()
+ onPrefaceReceipt func()
maxConcurrentStreams uint32
streamQuota int64
@@ -145,7 +145,7 @@
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
-func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
+func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
@@ -240,7 +240,7 @@
kp: kp,
statsHandler: opts.StatsHandler,
initialWindowSize: initialWindowSize,
- onSuccess: onSuccess,
+ onPrefaceReceipt: onPrefaceReceipt,
nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
@@ -362,6 +362,9 @@
ctx: s.ctx,
ctxDone: s.ctx.Done(),
recv: s.buf,
+ closeStream: func(err error) {
+ t.CloseStream(s, err)
+ },
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -780,7 +783,7 @@
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
- go t.onClose()
+ t.onClose()
return err
}
@@ -1210,7 +1213,7 @@
t.Close() // this kicks off resetTransport, so must be last before return
return
}
- t.onSuccess()
+ t.onPrefaceReceipt()
t.handleSettings(sf, true)
// loop to keep reading incoming messages on this transport.
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 4d7e890..2580aa7 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -110,15 +110,15 @@
return b.c
}
-//
// recvBufferReader implements io.Reader interface to read the data from
// recvBuffer.
type recvBufferReader struct {
- ctx context.Context
- ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
- recv *recvBuffer
- last []byte // Stores the remaining data in the previous calls.
- err error
+ closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
+ ctx context.Context
+ ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
+ recv *recvBuffer
+ last []byte // Stores the remaining data in the previous calls.
+ err error
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
@@ -128,31 +128,53 @@
if r.err != nil {
return 0, r.err
}
- n, r.err = r.read(p)
- return n, r.err
-}
-
-func (r *recvBufferReader) read(p []byte) (n int, err error) {
if r.last != nil && len(r.last) > 0 {
// Read remaining data left in last call.
copied := copy(p, r.last)
r.last = r.last[copied:]
return copied, nil
}
+ if r.closeStream != nil {
+ n, r.err = r.readClient(p)
+ } else {
+ n, r.err = r.read(p)
+ }
+ return n, r.err
+}
+
+func (r *recvBufferReader) read(p []byte) (n int, err error) {
select {
case <-r.ctxDone:
return 0, ContextErr(r.ctx.Err())
case m := <-r.recv.get():
- r.recv.load()
- if m.err != nil {
- return 0, m.err
- }
- copied := copy(p, m.data)
- r.last = m.data[copied:]
- return copied, nil
+ return r.readAdditional(m, p)
}
}
+func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
+ // If the context is canceled, then closes the stream with nil metadata.
+ // closeStream writes its error parameter to r.recv as a recvMsg.
+ // r.readAdditional acts on that message and returns the necessary error.
+ select {
+ case <-r.ctxDone:
+ r.closeStream(ContextErr(r.ctx.Err()))
+ m := <-r.recv.get()
+ return r.readAdditional(m, p)
+ case m := <-r.recv.get():
+ return r.readAdditional(m, p)
+ }
+}
+
+func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
+ r.recv.load()
+ if m.err != nil {
+ return 0, m.err
+ }
+ copied := copy(p, m.data)
+ r.last = m.data[copied:]
+ return copied, nil
+}
+
type streamState uint32
const (
@@ -511,8 +533,8 @@
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
-func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
- return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess, onGoAway, onClose)
+func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
+ return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
}
// Options provides additional hints and information for message