Add support for user-friendly names in YANG data with aliases in KV store
Change-Id: I8b4fbb290dfb467b401e8cf20b825ddbb3bf897e
diff --git a/vendor/go.etcd.io/etcd/LICENSE b/vendor/go.etcd.io/etcd/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/vendor/go.etcd.io/etcd/NOTICE b/vendor/go.etcd.io/etcd/NOTICE
new file mode 100644
index 0000000..b39ddfa
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/NOTICE
@@ -0,0 +1,5 @@
+CoreOS Project
+Copyright 2014 CoreOS, Inc
+
+This product includes software developed at CoreOS, Inc.
+(http://www.coreos.com/).
diff --git a/vendor/go.etcd.io/etcd/clientv3/auth.go b/vendor/go.etcd.io/etcd/clientv3/auth.go
new file mode 100644
index 0000000..edccf1a
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/auth.go
@@ -0,0 +1,233 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "fmt"
+ "strings"
+
+ "github.com/coreos/etcd/auth/authpb"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+type (
+ AuthEnableResponse pb.AuthEnableResponse
+ AuthDisableResponse pb.AuthDisableResponse
+ AuthenticateResponse pb.AuthenticateResponse
+ AuthUserAddResponse pb.AuthUserAddResponse
+ AuthUserDeleteResponse pb.AuthUserDeleteResponse
+ AuthUserChangePasswordResponse pb.AuthUserChangePasswordResponse
+ AuthUserGrantRoleResponse pb.AuthUserGrantRoleResponse
+ AuthUserGetResponse pb.AuthUserGetResponse
+ AuthUserRevokeRoleResponse pb.AuthUserRevokeRoleResponse
+ AuthRoleAddResponse pb.AuthRoleAddResponse
+ AuthRoleGrantPermissionResponse pb.AuthRoleGrantPermissionResponse
+ AuthRoleGetResponse pb.AuthRoleGetResponse
+ AuthRoleRevokePermissionResponse pb.AuthRoleRevokePermissionResponse
+ AuthRoleDeleteResponse pb.AuthRoleDeleteResponse
+ AuthUserListResponse pb.AuthUserListResponse
+ AuthRoleListResponse pb.AuthRoleListResponse
+
+ PermissionType authpb.Permission_Type
+ Permission authpb.Permission
+)
+
+const (
+ PermRead = authpb.READ
+ PermWrite = authpb.WRITE
+ PermReadWrite = authpb.READWRITE
+)
+
+type Auth interface {
+ // AuthEnable enables auth of an etcd cluster.
+ AuthEnable(ctx context.Context) (*AuthEnableResponse, error)
+
+ // AuthDisable disables auth of an etcd cluster.
+ AuthDisable(ctx context.Context) (*AuthDisableResponse, error)
+
+ // UserAdd adds a new user to an etcd cluster.
+ UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error)
+
+ // UserDelete deletes a user from an etcd cluster.
+ UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error)
+
+ // UserChangePassword changes a password of a user.
+ UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error)
+
+ // UserGrantRole grants a role to a user.
+ UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error)
+
+ // UserGet gets a detailed information of a user.
+ UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error)
+
+ // UserList gets a list of all users.
+ UserList(ctx context.Context) (*AuthUserListResponse, error)
+
+ // UserRevokeRole revokes a role of a user.
+ UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error)
+
+ // RoleAdd adds a new role to an etcd cluster.
+ RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error)
+
+ // RoleGrantPermission grants a permission to a role.
+ RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error)
+
+ // RoleGet gets a detailed information of a role.
+ RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error)
+
+ // RoleList gets a list of all roles.
+ RoleList(ctx context.Context) (*AuthRoleListResponse, error)
+
+ // RoleRevokePermission revokes a permission from a role.
+ RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error)
+
+ // RoleDelete deletes a role.
+ RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error)
+}
+
+type auth struct {
+ remote pb.AuthClient
+ callOpts []grpc.CallOption
+}
+
+func NewAuth(c *Client) Auth {
+ api := &auth{remote: RetryAuthClient(c)}
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
+ resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
+ return (*AuthEnableResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
+ resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...)
+ return (*AuthDisableResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
+ resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, auth.callOpts...)
+ return (*AuthUserAddResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
+ resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...)
+ return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
+ resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...)
+ return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
+ resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...)
+ return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
+ resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...)
+ return (*AuthUserGetResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
+ resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...)
+ return (*AuthUserListResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
+ resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...)
+ return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
+ resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...)
+ return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) {
+ perm := &authpb.Permission{
+ Key: []byte(key),
+ RangeEnd: []byte(rangeEnd),
+ PermType: authpb.Permission_Type(permType),
+ }
+ resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...)
+ return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
+ resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...)
+ return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
+ resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...)
+ return (*AuthRoleListResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
+ resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, auth.callOpts...)
+ return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
+ resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...)
+ return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
+}
+
+func StrToPermissionType(s string) (PermissionType, error) {
+ val, ok := authpb.Permission_Type_value[strings.ToUpper(s)]
+ if ok {
+ return PermissionType(val), nil
+ }
+ return PermissionType(-1), fmt.Errorf("invalid permission type: %s", s)
+}
+
+type authenticator struct {
+ conn *grpc.ClientConn // conn in-use
+ remote pb.AuthClient
+ callOpts []grpc.CallOption
+}
+
+func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
+ resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
+ return (*AuthenticateResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *authenticator) close() {
+ auth.conn.Close()
+}
+
+func newAuthenticator(ctx context.Context, target string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
+ conn, err := grpc.DialContext(ctx, target, opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ api := &authenticator{
+ conn: conn,
+ remote: pb.NewAuthClient(conn),
+ }
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api, nil
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/client.go b/vendor/go.etcd.io/etcd/clientv3/client.go
new file mode 100644
index 0000000..c49e4ba
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/client.go
@@ -0,0 +1,665 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/coreos/etcd/clientv3/balancer"
+ "github.com/coreos/etcd/clientv3/balancer/picker"
+ "github.com/coreos/etcd/clientv3/balancer/resolver/endpoint"
+ "github.com/coreos/etcd/clientv3/credentials"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ "github.com/coreos/etcd/pkg/logutil"
+ "github.com/coreos/pkg/capnslog"
+ "github.com/google/uuid"
+ "go.uber.org/zap"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ grpccredentials "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/status"
+)
+
+var (
+ ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
+ ErrOldCluster = errors.New("etcdclient: old cluster version")
+
+ roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
+)
+
+var (
+ plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "clientv3")
+)
+
+func init() {
+ lg := zap.NewNop()
+ if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
+ lcfg := logutil.DefaultZapLoggerConfig
+ lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
+
+ var err error
+ lg, err = lcfg.Build() // info level logging
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ // TODO: support custom balancer
+ balancer.RegisterBuilder(balancer.Config{
+ Policy: picker.RoundrobinBalanced,
+ Name: roundRobinBalancerName,
+ Logger: lg,
+ })
+}
+
+// Client provides and manages an etcd v3 client session.
+type Client struct {
+ Cluster
+ KV
+ Lease
+ Watcher
+ Auth
+ Maintenance
+
+ conn *grpc.ClientConn
+
+ cfg Config
+ creds grpccredentials.TransportCredentials
+ resolverGroup *endpoint.ResolverGroup
+ mu *sync.RWMutex
+
+ ctx context.Context
+ cancel context.CancelFunc
+
+ // Username is a user name for authentication.
+ Username string
+ // Password is a password for authentication.
+ Password string
+ authTokenBundle credentials.Bundle
+
+ callOpts []grpc.CallOption
+
+ lg *zap.Logger
+}
+
+// New creates a new etcdv3 client from a given configuration.
+func New(cfg Config) (*Client, error) {
+ if len(cfg.Endpoints) == 0 {
+ return nil, ErrNoAvailableEndpoints
+ }
+
+ return newClient(&cfg)
+}
+
+// NewCtxClient creates a client with a context but no underlying grpc
+// connection. This is useful for embedded cases that override the
+// service interface implementations and do not need connection management.
+func NewCtxClient(ctx context.Context) *Client {
+ cctx, cancel := context.WithCancel(ctx)
+ return &Client{ctx: cctx, cancel: cancel}
+}
+
+// NewFromURL creates a new etcdv3 client from a URL.
+func NewFromURL(url string) (*Client, error) {
+ return New(Config{Endpoints: []string{url}})
+}
+
+// NewFromURLs creates a new etcdv3 client from URLs.
+func NewFromURLs(urls []string) (*Client, error) {
+ return New(Config{Endpoints: urls})
+}
+
+// Close shuts down the client's etcd connections.
+func (c *Client) Close() error {
+ c.cancel()
+ c.Watcher.Close()
+ c.Lease.Close()
+ if c.resolverGroup != nil {
+ c.resolverGroup.Close()
+ }
+ if c.conn != nil {
+ return toErr(c.ctx, c.conn.Close())
+ }
+ return c.ctx.Err()
+}
+
+// Ctx is a context for "out of band" messages (e.g., for sending
+// "clean up" message when another context is canceled). It is
+// canceled on client Close().
+func (c *Client) Ctx() context.Context { return c.ctx }
+
+// Endpoints lists the registered endpoints for the client.
+func (c *Client) Endpoints() []string {
+ // copy the slice; protect original endpoints from being changed
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ eps := make([]string, len(c.cfg.Endpoints))
+ copy(eps, c.cfg.Endpoints)
+ return eps
+}
+
+// SetEndpoints updates client's endpoints.
+func (c *Client) SetEndpoints(eps ...string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.cfg.Endpoints = eps
+ c.resolverGroup.SetEndpoints(eps)
+}
+
+// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
+func (c *Client) Sync(ctx context.Context) error {
+ mresp, err := c.MemberList(ctx)
+ if err != nil {
+ return err
+ }
+ var eps []string
+ for _, m := range mresp.Members {
+ eps = append(eps, m.ClientURLs...)
+ }
+ c.SetEndpoints(eps...)
+ return nil
+}
+
+func (c *Client) autoSync() {
+ if c.cfg.AutoSyncInterval == time.Duration(0) {
+ return
+ }
+
+ for {
+ select {
+ case <-c.ctx.Done():
+ return
+ case <-time.After(c.cfg.AutoSyncInterval):
+ ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
+ err := c.Sync(ctx)
+ cancel()
+ if err != nil && err != c.ctx.Err() {
+ lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err)
+ }
+ }
+ }
+}
+
+func (c *Client) processCreds(scheme string) (creds grpccredentials.TransportCredentials) {
+ creds = c.creds
+ switch scheme {
+ case "unix":
+ case "http":
+ creds = nil
+ case "https", "unixs":
+ if creds != nil {
+ break
+ }
+ creds = credentials.NewBundle(credentials.Config{}).TransportCredentials()
+ default:
+ creds = nil
+ }
+ return creds
+}
+
+// dialSetupOpts gives the dial opts prior to any authentication.
+func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
+ if c.cfg.DialKeepAliveTime > 0 {
+ params := keepalive.ClientParameters{
+ Time: c.cfg.DialKeepAliveTime,
+ Timeout: c.cfg.DialKeepAliveTimeout,
+ PermitWithoutStream: c.cfg.PermitWithoutStream,
+ }
+ opts = append(opts, grpc.WithKeepaliveParams(params))
+ }
+ opts = append(opts, dopts...)
+
+ dialer := endpoint.Dialer
+ if creds != nil {
+ opts = append(opts, grpc.WithTransportCredentials(creds))
+ // gRPC load balancer workaround. See credentials.transportCredential for details.
+ if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok {
+ dialer = credsDialer.Dialer
+ }
+ } else {
+ opts = append(opts, grpc.WithInsecure())
+ }
+ opts = append(opts, grpc.WithContextDialer(dialer))
+
+ // Interceptor retry and backoff.
+ // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
+ // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
+ // once it is available.
+ rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
+ opts = append(opts,
+ // Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
+ // Streams that are safe to retry are enabled individually.
+ grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)),
+ grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)),
+ )
+
+ return opts, nil
+}
+
+// Dial connects to a single endpoint using the client's config.
+func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
+ creds, err := c.directDialCreds(ep)
+ if err != nil {
+ return nil, err
+ }
+ // Use the grpc passthrough resolver to directly dial a single endpoint.
+ // This resolver passes through the 'unix' and 'unixs' endpoints schemes used
+ // by etcd without modification, allowing us to directly dial endpoints and
+ // using the same dial functions that we use for load balancer dialing.
+ return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds)
+}
+
+func (c *Client) getToken(ctx context.Context) error {
+ var err error // return last error in a case of fail
+ var auth *authenticator
+
+ eps := c.Endpoints()
+ for _, ep := range eps {
+ // use dial options without dopts to avoid reusing the client balancer
+ var dOpts []grpc.DialOption
+ _, host, _ := endpoint.ParseEndpoint(ep)
+ target := c.resolverGroup.Target(host)
+ creds := c.dialWithBalancerCreds(ep)
+ dOpts, err = c.dialSetupOpts(creds, c.cfg.DialOptions...)
+ if err != nil {
+ err = fmt.Errorf("failed to configure auth dialer: %v", err)
+ continue
+ }
+ dOpts = append(dOpts, grpc.WithBalancerName(roundRobinBalancerName))
+ auth, err = newAuthenticator(ctx, target, dOpts, c)
+ if err != nil {
+ continue
+ }
+ defer auth.close()
+
+ var resp *AuthenticateResponse
+ resp, err = auth.authenticate(ctx, c.Username, c.Password)
+ if err != nil {
+ // return err without retrying other endpoints
+ if err == rpctypes.ErrAuthNotEnabled {
+ return err
+ }
+ continue
+ }
+
+ c.authTokenBundle.UpdateAuthToken(resp.Token)
+ return nil
+ }
+
+ return err
+}
+
+// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
+// of the provided endpoint determines the scheme used for all endpoints of the client connection.
+func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
+ _, host, _ := endpoint.ParseEndpoint(ep)
+ target := c.resolverGroup.Target(host)
+ creds := c.dialWithBalancerCreds(ep)
+ return c.dial(target, creds, dopts...)
+}
+
+// dial configures and dials any grpc balancer target.
+func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
+ opts, err := c.dialSetupOpts(creds, dopts...)
+ if err != nil {
+ return nil, fmt.Errorf("failed to configure dialer: %v", err)
+ }
+
+ if c.Username != "" && c.Password != "" {
+ c.authTokenBundle = credentials.NewBundle(credentials.Config{})
+
+ ctx, cancel := c.ctx, func() {}
+ if c.cfg.DialTimeout > 0 {
+ ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
+ }
+
+ err = c.getToken(ctx)
+ if err != nil {
+ if toErr(ctx, err) != rpctypes.ErrAuthNotEnabled {
+ if err == ctx.Err() && ctx.Err() != c.ctx.Err() {
+ err = context.DeadlineExceeded
+ }
+ cancel()
+ return nil, err
+ }
+ } else {
+ opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
+ }
+ cancel()
+ }
+
+ opts = append(opts, c.cfg.DialOptions...)
+
+ dctx := c.ctx
+ if c.cfg.DialTimeout > 0 {
+ var cancel context.CancelFunc
+ dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
+ defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
+ }
+
+ conn, err := grpc.DialContext(dctx, target, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return conn, nil
+}
+
+func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredentials, error) {
+ _, host, scheme := endpoint.ParseEndpoint(ep)
+ creds := c.creds
+ if len(scheme) != 0 {
+ creds = c.processCreds(scheme)
+ if creds != nil {
+ clone := creds.Clone()
+ // Set the server name must to the endpoint hostname without port since grpc
+ // otherwise attempts to check if x509 cert is valid for the full endpoint
+ // including the scheme and port, which fails.
+ overrideServerName, _, err := net.SplitHostPort(host)
+ if err != nil {
+ // Either the host didn't have a port or the host could not be parsed. Either way, continue with the
+ // original host string.
+ overrideServerName = host
+ }
+ clone.OverrideServerName(overrideServerName)
+ creds = clone
+ }
+ }
+ return creds, nil
+}
+
+func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials {
+ _, _, scheme := endpoint.ParseEndpoint(ep)
+ creds := c.creds
+ if len(scheme) != 0 {
+ creds = c.processCreds(scheme)
+ }
+ return creds
+}
+
+func newClient(cfg *Config) (*Client, error) {
+ if cfg == nil {
+ cfg = &Config{}
+ }
+ var creds grpccredentials.TransportCredentials
+ if cfg.TLS != nil {
+ creds = credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials()
+ }
+
+ // use a temporary skeleton client to bootstrap first connection
+ baseCtx := context.TODO()
+ if cfg.Context != nil {
+ baseCtx = cfg.Context
+ }
+
+ ctx, cancel := context.WithCancel(baseCtx)
+ client := &Client{
+ conn: nil,
+ cfg: *cfg,
+ creds: creds,
+ ctx: ctx,
+ cancel: cancel,
+ mu: new(sync.RWMutex),
+ callOpts: defaultCallOpts,
+ }
+
+ lcfg := logutil.DefaultZapLoggerConfig
+ if cfg.LogConfig != nil {
+ lcfg = *cfg.LogConfig
+ }
+ var err error
+ client.lg, err = lcfg.Build()
+ if err != nil {
+ return nil, err
+ }
+
+ if cfg.Username != "" && cfg.Password != "" {
+ client.Username = cfg.Username
+ client.Password = cfg.Password
+ }
+ if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 {
+ if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize {
+ return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize)
+ }
+ callOpts := []grpc.CallOption{
+ defaultFailFast,
+ defaultMaxCallSendMsgSize,
+ defaultMaxCallRecvMsgSize,
+ }
+ if cfg.MaxCallSendMsgSize > 0 {
+ callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize)
+ }
+ if cfg.MaxCallRecvMsgSize > 0 {
+ callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize)
+ }
+ client.callOpts = callOpts
+ }
+
+ // Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
+ // to dial so the client knows to use this resolver.
+ client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String()))
+ if err != nil {
+ client.cancel()
+ return nil, err
+ }
+ client.resolverGroup.SetEndpoints(cfg.Endpoints)
+
+ if len(cfg.Endpoints) < 1 {
+ return nil, fmt.Errorf("at least one Endpoint must is required in client config")
+ }
+ dialEndpoint := cfg.Endpoints[0]
+
+ // Use a provided endpoint target so that for https:// without any tls config given, then
+ // grpc will assume the certificate server name is the endpoint host.
+ conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
+ if err != nil {
+ client.cancel()
+ client.resolverGroup.Close()
+ return nil, err
+ }
+ // TODO: With the old grpc balancer interface, we waited until the dial timeout
+ // for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
+ client.conn = conn
+
+ client.Cluster = NewCluster(client)
+ client.KV = NewKV(client)
+ client.Lease = NewLease(client)
+ client.Watcher = NewWatcher(client)
+ client.Auth = NewAuth(client)
+ client.Maintenance = NewMaintenance(client)
+
+ if cfg.RejectOldCluster {
+ if err := client.checkVersion(); err != nil {
+ client.Close()
+ return nil, err
+ }
+ }
+
+ go client.autoSync()
+ return client, nil
+}
+
+// roundRobinQuorumBackoff retries against quorum between each backoff.
+// This is intended for use with a round robin load balancer.
+func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc {
+ return func(attempt uint) time.Duration {
+ // after each round robin across quorum, backoff for our wait between duration
+ n := uint(len(c.Endpoints()))
+ quorum := (n/2 + 1)
+ if attempt%quorum == 0 {
+ c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
+ return jitterUp(waitBetween, jitterFraction)
+ }
+ c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
+ return 0
+ }
+}
+
+func (c *Client) checkVersion() (err error) {
+ var wg sync.WaitGroup
+
+ eps := c.Endpoints()
+ errc := make(chan error, len(eps))
+ ctx, cancel := context.WithCancel(c.ctx)
+ if c.cfg.DialTimeout > 0 {
+ cancel()
+ ctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
+ }
+
+ wg.Add(len(eps))
+ for _, ep := range eps {
+ // if cluster is current, any endpoint gives a recent version
+ go func(e string) {
+ defer wg.Done()
+ resp, rerr := c.Status(ctx, e)
+ if rerr != nil {
+ errc <- rerr
+ return
+ }
+ vs := strings.Split(resp.Version, ".")
+ maj, min := 0, 0
+ if len(vs) >= 2 {
+ var serr error
+ if maj, serr = strconv.Atoi(vs[0]); serr != nil {
+ errc <- serr
+ return
+ }
+ if min, serr = strconv.Atoi(vs[1]); serr != nil {
+ errc <- serr
+ return
+ }
+ }
+ if maj < 3 || (maj == 3 && min < 2) {
+ rerr = ErrOldCluster
+ }
+ errc <- rerr
+ }(ep)
+ }
+ // wait for success
+ for range eps {
+ if err = <-errc; err == nil {
+ break
+ }
+ }
+ cancel()
+ wg.Wait()
+ return err
+}
+
+// ActiveConnection returns the current in-use connection
+func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
+
+// isHaltErr returns true if the given error and context indicate no forward
+// progress can be made, even after reconnecting.
+func isHaltErr(ctx context.Context, err error) bool {
+ if ctx != nil && ctx.Err() != nil {
+ return true
+ }
+ if err == nil {
+ return false
+ }
+ ev, _ := status.FromError(err)
+ // Unavailable codes mean the system will be right back.
+ // (e.g., can't connect, lost leader)
+ // Treat Internal codes as if something failed, leaving the
+ // system in an inconsistent state, but retrying could make progress.
+ // (e.g., failed in middle of send, corrupted frame)
+ // TODO: are permanent Internal errors possible from grpc?
+ return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
+}
+
+// isUnavailableErr returns true if the given error is an unavailable error
+func isUnavailableErr(ctx context.Context, err error) bool {
+ if ctx != nil && ctx.Err() != nil {
+ return false
+ }
+ if err == nil {
+ return false
+ }
+ ev, ok := status.FromError(err)
+ if ok {
+ // Unavailable codes mean the system will be right back.
+ // (e.g., can't connect, lost leader)
+ return ev.Code() == codes.Unavailable
+ }
+ return false
+}
+
+func toErr(ctx context.Context, err error) error {
+ if err == nil {
+ return nil
+ }
+ err = rpctypes.Error(err)
+ if _, ok := err.(rpctypes.EtcdError); ok {
+ return err
+ }
+ if ev, ok := status.FromError(err); ok {
+ code := ev.Code()
+ switch code {
+ case codes.DeadlineExceeded:
+ fallthrough
+ case codes.Canceled:
+ if ctx.Err() != nil {
+ err = ctx.Err()
+ }
+ }
+ }
+ return err
+}
+
+func canceledByCaller(stopCtx context.Context, err error) bool {
+ if stopCtx.Err() == nil || err == nil {
+ return false
+ }
+
+ return err == context.Canceled || err == context.DeadlineExceeded
+}
+
+// IsConnCanceled returns true, if error is from a closed gRPC connection.
+// ref. https://github.com/grpc/grpc-go/pull/1854
+func IsConnCanceled(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ // >= gRPC v1.23.x
+ s, ok := status.FromError(err)
+ if ok {
+ // connection is canceled or server has already closed the connection
+ return s.Code() == codes.Canceled || s.Message() == "transport is closing"
+ }
+
+ // >= gRPC v1.10.x
+ if err == context.Canceled {
+ return true
+ }
+
+ // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
+ return strings.Contains(err.Error(), "grpc: the client connection is closing")
+}
+
+// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details.
+type TransportCredentialsWithDialer interface {
+ grpccredentials.TransportCredentials
+ Dialer(ctx context.Context, dialEp string) (net.Conn, error)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/cluster.go b/vendor/go.etcd.io/etcd/clientv3/cluster.go
new file mode 100644
index 0000000..785672b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/cluster.go
@@ -0,0 +1,114 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "github.com/coreos/etcd/pkg/types"
+
+ "google.golang.org/grpc"
+)
+
+type (
+ Member pb.Member
+ MemberListResponse pb.MemberListResponse
+ MemberAddResponse pb.MemberAddResponse
+ MemberRemoveResponse pb.MemberRemoveResponse
+ MemberUpdateResponse pb.MemberUpdateResponse
+)
+
+type Cluster interface {
+ // MemberList lists the current cluster membership.
+ MemberList(ctx context.Context) (*MemberListResponse, error)
+
+ // MemberAdd adds a new member into the cluster.
+ MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
+
+ // MemberRemove removes an existing member from the cluster.
+ MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)
+
+ // MemberUpdate updates the peer addresses of the member.
+ MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error)
+}
+
+type cluster struct {
+ remote pb.ClusterClient
+ callOpts []grpc.CallOption
+}
+
+func NewCluster(c *Client) Cluster {
+ api := &cluster{remote: RetryClusterClient(c)}
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
+ api := &cluster{remote: remote}
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
+ // fail-fast before panic in rafthttp
+ if _, err := types.NewURLs(peerAddrs); err != nil {
+ return nil, err
+ }
+
+ r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
+ resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*MemberAddResponse)(resp), nil
+}
+
+func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
+ r := &pb.MemberRemoveRequest{ID: id}
+ resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*MemberRemoveResponse)(resp), nil
+}
+
+func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
+ // fail-fast before panic in rafthttp
+ if _, err := types.NewURLs(peerAddrs); err != nil {
+ return nil, err
+ }
+
+ // it is safe to retry on update.
+ r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
+ resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...)
+ if err == nil {
+ return (*MemberUpdateResponse)(resp), nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
+ // it is safe to retry on list.
+ resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, c.callOpts...)
+ if err == nil {
+ return (*MemberListResponse)(resp), nil
+ }
+ return nil, toErr(ctx, err)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/compact_op.go b/vendor/go.etcd.io/etcd/clientv3/compact_op.go
new file mode 100644
index 0000000..41e80c1
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/compact_op.go
@@ -0,0 +1,51 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+// CompactOp represents a compact operation.
+type CompactOp struct {
+ revision int64
+ physical bool
+}
+
+// CompactOption configures compact operation.
+type CompactOption func(*CompactOp)
+
+func (op *CompactOp) applyCompactOpts(opts []CompactOption) {
+ for _, opt := range opts {
+ opt(op)
+ }
+}
+
+// OpCompact wraps slice CompactOption to create a CompactOp.
+func OpCompact(rev int64, opts ...CompactOption) CompactOp {
+ ret := CompactOp{revision: rev}
+ ret.applyCompactOpts(opts)
+ return ret
+}
+
+func (op CompactOp) toRequest() *pb.CompactionRequest {
+ return &pb.CompactionRequest{Revision: op.revision, Physical: op.physical}
+}
+
+// WithCompactPhysical makes Compact wait until all compacted entries are
+// removed from the etcd server's storage.
+func WithCompactPhysical() CompactOption {
+ return func(op *CompactOp) { op.physical = true }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/compare.go b/vendor/go.etcd.io/etcd/clientv3/compare.go
new file mode 100644
index 0000000..b5f0a25
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/compare.go
@@ -0,0 +1,140 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type CompareTarget int
+type CompareResult int
+
+const (
+ CompareVersion CompareTarget = iota
+ CompareCreated
+ CompareModified
+ CompareValue
+)
+
+type Cmp pb.Compare
+
+func Compare(cmp Cmp, result string, v interface{}) Cmp {
+ var r pb.Compare_CompareResult
+
+ switch result {
+ case "=":
+ r = pb.Compare_EQUAL
+ case "!=":
+ r = pb.Compare_NOT_EQUAL
+ case ">":
+ r = pb.Compare_GREATER
+ case "<":
+ r = pb.Compare_LESS
+ default:
+ panic("Unknown result op")
+ }
+
+ cmp.Result = r
+ switch cmp.Target {
+ case pb.Compare_VALUE:
+ val, ok := v.(string)
+ if !ok {
+ panic("bad compare value")
+ }
+ cmp.TargetUnion = &pb.Compare_Value{Value: []byte(val)}
+ case pb.Compare_VERSION:
+ cmp.TargetUnion = &pb.Compare_Version{Version: mustInt64(v)}
+ case pb.Compare_CREATE:
+ cmp.TargetUnion = &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)}
+ case pb.Compare_MOD:
+ cmp.TargetUnion = &pb.Compare_ModRevision{ModRevision: mustInt64(v)}
+ case pb.Compare_LEASE:
+ cmp.TargetUnion = &pb.Compare_Lease{Lease: mustInt64orLeaseID(v)}
+ default:
+ panic("Unknown compare type")
+ }
+ return cmp
+}
+
+func Value(key string) Cmp {
+ return Cmp{Key: []byte(key), Target: pb.Compare_VALUE}
+}
+
+func Version(key string) Cmp {
+ return Cmp{Key: []byte(key), Target: pb.Compare_VERSION}
+}
+
+func CreateRevision(key string) Cmp {
+ return Cmp{Key: []byte(key), Target: pb.Compare_CREATE}
+}
+
+func ModRevision(key string) Cmp {
+ return Cmp{Key: []byte(key), Target: pb.Compare_MOD}
+}
+
+// LeaseValue compares a key's LeaseID to a value of your choosing. The empty
+// LeaseID is 0, otherwise known as `NoLease`.
+func LeaseValue(key string) Cmp {
+ return Cmp{Key: []byte(key), Target: pb.Compare_LEASE}
+}
+
+// KeyBytes returns the byte slice holding with the comparison key.
+func (cmp *Cmp) KeyBytes() []byte { return cmp.Key }
+
+// WithKeyBytes sets the byte slice for the comparison key.
+func (cmp *Cmp) WithKeyBytes(key []byte) { cmp.Key = key }
+
+// ValueBytes returns the byte slice holding the comparison value, if any.
+func (cmp *Cmp) ValueBytes() []byte {
+ if tu, ok := cmp.TargetUnion.(*pb.Compare_Value); ok {
+ return tu.Value
+ }
+ return nil
+}
+
+// WithValueBytes sets the byte slice for the comparison's value.
+func (cmp *Cmp) WithValueBytes(v []byte) { cmp.TargetUnion.(*pb.Compare_Value).Value = v }
+
+// WithRange sets the comparison to scan the range [key, end).
+func (cmp Cmp) WithRange(end string) Cmp {
+ cmp.RangeEnd = []byte(end)
+ return cmp
+}
+
+// WithPrefix sets the comparison to scan all keys prefixed by the key.
+func (cmp Cmp) WithPrefix() Cmp {
+ cmp.RangeEnd = getPrefix(cmp.Key)
+ return cmp
+}
+
+// mustInt64 panics if val isn't an int or int64. It returns an int64 otherwise.
+func mustInt64(val interface{}) int64 {
+ if v, ok := val.(int64); ok {
+ return v
+ }
+ if v, ok := val.(int); ok {
+ return int64(v)
+ }
+ panic("bad value")
+}
+
+// mustInt64orLeaseID panics if val isn't a LeaseID, int or int64. It returns an
+// int64 otherwise.
+func mustInt64orLeaseID(val interface{}) int64 {
+ if v, ok := val.(LeaseID); ok {
+ return int64(v)
+ }
+ return mustInt64(val)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/config.go b/vendor/go.etcd.io/etcd/clientv3/config.go
new file mode 100644
index 0000000..9c17fc2
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/config.go
@@ -0,0 +1,86 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "crypto/tls"
+ "time"
+
+ "go.uber.org/zap"
+ "google.golang.org/grpc"
+)
+
+type Config struct {
+ // Endpoints is a list of URLs.
+ Endpoints []string `json:"endpoints"`
+
+ // AutoSyncInterval is the interval to update endpoints with its latest members.
+ // 0 disables auto-sync. By default auto-sync is disabled.
+ AutoSyncInterval time.Duration `json:"auto-sync-interval"`
+
+ // DialTimeout is the timeout for failing to establish a connection.
+ DialTimeout time.Duration `json:"dial-timeout"`
+
+ // DialKeepAliveTime is the time after which client pings the server to see if
+ // transport is alive.
+ DialKeepAliveTime time.Duration `json:"dial-keep-alive-time"`
+
+ // DialKeepAliveTimeout is the time that the client waits for a response for the
+ // keep-alive probe. If the response is not received in this time, the connection is closed.
+ DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"`
+
+ // MaxCallSendMsgSize is the client-side request send limit in bytes.
+ // If 0, it defaults to 2.0 MiB (2 * 1024 * 1024).
+ // Make sure that "MaxCallSendMsgSize" < server-side default send/recv limit.
+ // ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
+ MaxCallSendMsgSize int
+
+ // MaxCallRecvMsgSize is the client-side response receive limit.
+ // If 0, it defaults to "math.MaxInt32", because range response can
+ // easily exceed request send limits.
+ // Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.
+ // ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
+ MaxCallRecvMsgSize int
+
+ // TLS holds the client secure credentials, if any.
+ TLS *tls.Config
+
+ // Username is a user name for authentication.
+ Username string `json:"username"`
+
+ // Password is a password for authentication.
+ Password string `json:"password"`
+
+ // RejectOldCluster when set will refuse to create a client against an outdated cluster.
+ RejectOldCluster bool `json:"reject-old-cluster"`
+
+ // DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
+ // For example, pass "grpc.WithBlock()" to block until the underlying connection is up.
+ // Without this, Dial returns immediately and connecting the server happens in background.
+ DialOptions []grpc.DialOption
+
+ // LogConfig configures client-side logger.
+ // If nil, use the default logger.
+ // TODO: configure gRPC logger
+ LogConfig *zap.Config
+
+ // Context is the default client context; it can be used to cancel grpc dial out and
+ // other operations that do not have an explicit context.
+ Context context.Context
+
+ // PermitWithoutStream when set will allow client to send keepalive pings to server without any active streams(RPCs).
+ PermitWithoutStream bool `json:"permit-without-stream"`
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/ctx.go b/vendor/go.etcd.io/etcd/clientv3/ctx.go
new file mode 100644
index 0000000..da8297b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/ctx.go
@@ -0,0 +1,64 @@
+// Copyright 2020 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "strings"
+
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ "github.com/coreos/etcd/version"
+ "google.golang.org/grpc/metadata"
+)
+
+// WithRequireLeader requires client requests to only succeed
+// when the cluster has a leader.
+func WithRequireLeader(ctx context.Context) context.Context {
+ md, ok := metadata.FromOutgoingContext(ctx)
+ if !ok { // no outgoing metadata ctx key, create one
+ md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
+ return metadata.NewOutgoingContext(ctx, md)
+ }
+ copied := md.Copy() // avoid racey updates
+ // overwrite/add 'hasleader' key/value
+ metadataSet(copied, rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
+ return metadata.NewOutgoingContext(ctx, copied)
+}
+
+// embeds client version
+func withVersion(ctx context.Context) context.Context {
+ md, ok := metadata.FromOutgoingContext(ctx)
+ if !ok { // no outgoing metadata ctx key, create one
+ md = metadata.Pairs(rpctypes.MetadataClientAPIVersionKey, version.APIVersion)
+ return metadata.NewOutgoingContext(ctx, md)
+ }
+ copied := md.Copy() // avoid racey updates
+ // overwrite/add version key/value
+ metadataSet(copied, rpctypes.MetadataClientAPIVersionKey, version.APIVersion)
+ return metadata.NewOutgoingContext(ctx, copied)
+}
+
+func metadataGet(md metadata.MD, k string) []string {
+ k = strings.ToLower(k)
+ return md[k]
+}
+
+func metadataSet(md metadata.MD, k string, vals ...string) {
+ if len(vals) == 0 {
+ return
+ }
+ k = strings.ToLower(k)
+ md[k] = vals
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/doc.go b/vendor/go.etcd.io/etcd/clientv3/doc.go
new file mode 100644
index 0000000..717fbe4
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/doc.go
@@ -0,0 +1,97 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package clientv3 implements the official Go etcd client for v3.
+//
+// Create client using `clientv3.New`:
+//
+// // expect dial time-out on ipv4 blackhole
+// _, err := clientv3.New(clientv3.Config{
+// Endpoints: []string{"http://254.0.0.1:12345"},
+// DialTimeout: 2 * time.Second
+// })
+//
+// // etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3
+// if err == context.DeadlineExceeded {
+// // handle errors
+// }
+//
+// // etcd clientv3 <= v3.2.9, grpc/grpc-go <= v1.2.1
+// if err == grpc.ErrClientConnTimeout {
+// // handle errors
+// }
+//
+// cli, err := clientv3.New(clientv3.Config{
+// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
+// DialTimeout: 5 * time.Second,
+// })
+// if err != nil {
+// // handle error!
+// }
+// defer cli.Close()
+//
+// Make sure to close the client after using it. If the client is not closed, the
+// connection will have leaky goroutines.
+//
+// To specify a client request timeout, wrap the context with context.WithTimeout:
+//
+// ctx, cancel := context.WithTimeout(context.Background(), timeout)
+// resp, err := kvc.Put(ctx, "sample_key", "sample_value")
+// cancel()
+// if err != nil {
+// // handle error!
+// }
+// // use the response
+//
+// The Client has internal state (watchers and leases), so Clients should be reused instead of created as needed.
+// Clients are safe for concurrent use by multiple goroutines.
+//
+// etcd client returns 3 types of errors:
+//
+// 1. context error: canceled or deadline exceeded.
+// 2. gRPC status error: e.g. when clock drifts in server-side before client's context deadline exceeded.
+// 3. gRPC error: see https://github.com/coreos/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
+//
+// Here is the example code to handle client errors:
+//
+// resp, err := kvc.Put(ctx, "", "")
+// if err != nil {
+// if err == context.Canceled {
+// // ctx is canceled by another routine
+// } else if err == context.DeadlineExceeded {
+// // ctx is attached with a deadline and it exceeded
+// } else if ev, ok := status.FromError(err); ok {
+// code := ev.Code()
+// if code == codes.DeadlineExceeded {
+// // server-side context might have timed-out first (due to clock skew)
+// // while original client-side context is not timed-out yet
+// }
+// } else if verr, ok := err.(*v3rpc.ErrEmptyKey); ok {
+// // process (verr.Errors)
+// } else {
+// // bad cluster endpoints, which are not etcd servers
+// }
+// }
+//
+// go func() { cli.Close() }()
+// _, err := kvc.Get(ctx, "a")
+// if err != nil {
+// if err == context.Canceled {
+// // grpc balancer calls 'Get' with an inflight client.Close
+// } else if err == grpc.ErrClientConnClosing {
+// // grpc balancer calls 'Get' after client.Close.
+// }
+// }
+//
+package clientv3
diff --git a/vendor/go.etcd.io/etcd/clientv3/kv.go b/vendor/go.etcd.io/etcd/clientv3/kv.go
new file mode 100644
index 0000000..5a7469b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/kv.go
@@ -0,0 +1,177 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+type (
+ CompactResponse pb.CompactionResponse
+ PutResponse pb.PutResponse
+ GetResponse pb.RangeResponse
+ DeleteResponse pb.DeleteRangeResponse
+ TxnResponse pb.TxnResponse
+)
+
+type KV interface {
+ // Put puts a key-value pair into etcd.
+ // Note that key,value can be plain bytes array and string is
+ // an immutable representation of that bytes array.
+ // To get a string of bytes, do string([]byte{0x10, 0x20}).
+ Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
+
+ // Get retrieves keys.
+ // By default, Get will return the value for "key", if any.
+ // When passed WithRange(end), Get will return the keys in the range [key, end).
+ // When passed WithFromKey(), Get returns keys greater than or equal to key.
+ // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
+ // if the required revision is compacted, the request will fail with ErrCompacted .
+ // When passed WithLimit(limit), the number of returned keys is bounded by limit.
+ // When passed WithSort(), the keys will be sorted.
+ Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
+
+ // Delete deletes a key, or optionally using WithRange(end), [key, end).
+ Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
+
+ // Compact compacts etcd KV history before the given rev.
+ Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
+
+ // Do applies a single Op on KV without a transaction.
+ // Do is useful when creating arbitrary operations to be issued at a
+ // later time; the user can range over the operations, calling Do to
+ // execute them. Get/Put/Delete, on the other hand, are best suited
+ // for when the operation should be issued at the time of declaration.
+ Do(ctx context.Context, op Op) (OpResponse, error)
+
+ // Txn creates a transaction.
+ Txn(ctx context.Context) Txn
+}
+
+type OpResponse struct {
+ put *PutResponse
+ get *GetResponse
+ del *DeleteResponse
+ txn *TxnResponse
+}
+
+func (op OpResponse) Put() *PutResponse { return op.put }
+func (op OpResponse) Get() *GetResponse { return op.get }
+func (op OpResponse) Del() *DeleteResponse { return op.del }
+func (op OpResponse) Txn() *TxnResponse { return op.txn }
+
+func (resp *PutResponse) OpResponse() OpResponse {
+ return OpResponse{put: resp}
+}
+func (resp *GetResponse) OpResponse() OpResponse {
+ return OpResponse{get: resp}
+}
+func (resp *DeleteResponse) OpResponse() OpResponse {
+ return OpResponse{del: resp}
+}
+func (resp *TxnResponse) OpResponse() OpResponse {
+ return OpResponse{txn: resp}
+}
+
+type kv struct {
+ remote pb.KVClient
+ callOpts []grpc.CallOption
+}
+
+func NewKV(c *Client) KV {
+ api := &kv{remote: RetryKVClient(c)}
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
+ api := &kv{remote: remote}
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
+ r, err := kv.Do(ctx, OpPut(key, val, opts...))
+ return r.put, toErr(ctx, err)
+}
+
+func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
+ r, err := kv.Do(ctx, OpGet(key, opts...))
+ return r.get, toErr(ctx, err)
+}
+
+func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
+ r, err := kv.Do(ctx, OpDelete(key, opts...))
+ return r.del, toErr(ctx, err)
+}
+
+func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
+ resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*CompactResponse)(resp), err
+}
+
+func (kv *kv) Txn(ctx context.Context) Txn {
+ return &txn{
+ kv: kv,
+ ctx: ctx,
+ callOpts: kv.callOpts,
+ }
+}
+
+func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
+ var err error
+ switch op.t {
+ case tRange:
+ var resp *pb.RangeResponse
+ resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
+ if err == nil {
+ return OpResponse{get: (*GetResponse)(resp)}, nil
+ }
+ case tPut:
+ var resp *pb.PutResponse
+ r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
+ resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
+ if err == nil {
+ return OpResponse{put: (*PutResponse)(resp)}, nil
+ }
+ case tDeleteRange:
+ var resp *pb.DeleteRangeResponse
+ r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
+ resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
+ if err == nil {
+ return OpResponse{del: (*DeleteResponse)(resp)}, nil
+ }
+ case tTxn:
+ var resp *pb.TxnResponse
+ resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
+ if err == nil {
+ return OpResponse{txn: (*TxnResponse)(resp)}, nil
+ }
+ default:
+ panic("Unknown op")
+ }
+ return OpResponse{}, toErr(ctx, err)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/lease.go b/vendor/go.etcd.io/etcd/clientv3/lease.go
new file mode 100644
index 0000000..3729cf3
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/lease.go
@@ -0,0 +1,588 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+type (
+ LeaseRevokeResponse pb.LeaseRevokeResponse
+ LeaseID int64
+)
+
+// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
+type LeaseGrantResponse struct {
+ *pb.ResponseHeader
+ ID LeaseID
+ TTL int64
+ Error string
+}
+
+// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
+type LeaseKeepAliveResponse struct {
+ *pb.ResponseHeader
+ ID LeaseID
+ TTL int64
+}
+
+// LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
+type LeaseTimeToLiveResponse struct {
+ *pb.ResponseHeader
+ ID LeaseID `json:"id"`
+
+ // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1.
+ TTL int64 `json:"ttl"`
+
+ // GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
+ GrantedTTL int64 `json:"granted-ttl"`
+
+ // Keys is the list of keys attached to this lease.
+ Keys [][]byte `json:"keys"`
+}
+
+// LeaseStatus represents a lease status.
+type LeaseStatus struct {
+ ID LeaseID `json:"id"`
+ // TODO: TTL int64
+}
+
+// LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
+type LeaseLeasesResponse struct {
+ *pb.ResponseHeader
+ Leases []LeaseStatus `json:"leases"`
+}
+
+const (
+ // defaultTTL is the assumed lease TTL used for the first keepalive
+ // deadline before the actual TTL is known to the client.
+ defaultTTL = 5 * time.Second
+ // NoLease is a lease ID for the absence of a lease.
+ NoLease LeaseID = 0
+
+ // retryConnWait is how long to wait before retrying request due to an error
+ retryConnWait = 500 * time.Millisecond
+)
+
+// LeaseResponseChSize is the size of buffer to store unsent lease responses.
+// WARNING: DO NOT UPDATE.
+// Only for testing purposes.
+var LeaseResponseChSize = 16
+
+// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
+//
+// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
+type ErrKeepAliveHalted struct {
+ Reason error
+}
+
+func (e ErrKeepAliveHalted) Error() string {
+ s := "etcdclient: leases keep alive halted"
+ if e.Reason != nil {
+ s += ": " + e.Reason.Error()
+ }
+ return s
+}
+
+type Lease interface {
+ // Grant creates a new lease.
+ Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
+
+ // Revoke revokes the given lease.
+ Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
+
+ // TimeToLive retrieves the lease information of the given lease ID.
+ TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
+
+ // Leases retrieves all leases.
+ Leases(ctx context.Context) (*LeaseLeasesResponse, error)
+
+ // KeepAlive keeps the given lease alive forever. If the keepalive response
+ // posted to the channel is not consumed immediately, the lease client will
+ // continue sending keep alive requests to the etcd server at least every
+ // second until latest response is consumed.
+ //
+ // The returned "LeaseKeepAliveResponse" channel closes if underlying keep
+ // alive stream is interrupted in some way the client cannot handle itself;
+ // given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
+ // from this closed channel is nil.
+ //
+ // If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
+ // no leader") or canceled by the caller (e.g. context.Canceled), the error
+ // is returned. Otherwise, it retries.
+ //
+ // TODO(v4.0): post errors to last keep alive message before closing
+ // (see https://github.com/coreos/etcd/pull/7866)
+ KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
+
+ // KeepAliveOnce renews the lease once. The response corresponds to the
+ // first message from calling KeepAlive. If the response has a recoverable
+ // error, KeepAliveOnce will retry the RPC with a new keep alive message.
+ //
+ // In most of the cases, Keepalive should be used instead of KeepAliveOnce.
+ KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
+
+ // Close releases all resources Lease keeps for efficient communication
+ // with the etcd server.
+ Close() error
+}
+
+type lessor struct {
+ mu sync.Mutex // guards all fields
+
+ // donec is closed and loopErr is set when recvKeepAliveLoop stops
+ donec chan struct{}
+ loopErr error
+
+ remote pb.LeaseClient
+
+ stream pb.Lease_LeaseKeepAliveClient
+ streamCancel context.CancelFunc
+
+ stopCtx context.Context
+ stopCancel context.CancelFunc
+
+ keepAlives map[LeaseID]*keepAlive
+
+ // firstKeepAliveTimeout is the timeout for the first keepalive request
+ // before the actual TTL is known to the lease client
+ firstKeepAliveTimeout time.Duration
+
+ // firstKeepAliveOnce ensures stream starts after first KeepAlive call.
+ firstKeepAliveOnce sync.Once
+
+ callOpts []grpc.CallOption
+}
+
+// keepAlive multiplexes a keepalive for a lease over multiple channels
+type keepAlive struct {
+ chs []chan<- *LeaseKeepAliveResponse
+ ctxs []context.Context
+ // deadline is the time the keep alive channels close if no response
+ deadline time.Time
+ // nextKeepAlive is when to send the next keep alive message
+ nextKeepAlive time.Time
+ // donec is closed on lease revoke, expiration, or cancel.
+ donec chan struct{}
+}
+
+func NewLease(c *Client) Lease {
+ return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
+}
+
+func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
+ l := &lessor{
+ donec: make(chan struct{}),
+ keepAlives: make(map[LeaseID]*keepAlive),
+ remote: remote,
+ firstKeepAliveTimeout: keepAliveTimeout,
+ }
+ if l.firstKeepAliveTimeout == time.Second {
+ l.firstKeepAliveTimeout = defaultTTL
+ }
+ if c != nil {
+ l.callOpts = c.callOpts
+ }
+ reqLeaderCtx := WithRequireLeader(context.Background())
+ l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
+ return l
+}
+
+func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
+ r := &pb.LeaseGrantRequest{TTL: ttl}
+ resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
+ if err == nil {
+ gresp := &LeaseGrantResponse{
+ ResponseHeader: resp.GetHeader(),
+ ID: LeaseID(resp.ID),
+ TTL: resp.TTL,
+ Error: resp.Error,
+ }
+ return gresp, nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
+ r := &pb.LeaseRevokeRequest{ID: int64(id)}
+ resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
+ if err == nil {
+ return (*LeaseRevokeResponse)(resp), nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
+ r := toLeaseTimeToLiveRequest(id, opts...)
+ resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
+ if err == nil {
+ gresp := &LeaseTimeToLiveResponse{
+ ResponseHeader: resp.GetHeader(),
+ ID: LeaseID(resp.ID),
+ TTL: resp.TTL,
+ GrantedTTL: resp.GrantedTTL,
+ Keys: resp.Keys,
+ }
+ return gresp, nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
+ resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
+ if err == nil {
+ leases := make([]LeaseStatus, len(resp.Leases))
+ for i := range resp.Leases {
+ leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
+ }
+ return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
+ ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
+
+ l.mu.Lock()
+ // ensure that recvKeepAliveLoop is still running
+ select {
+ case <-l.donec:
+ err := l.loopErr
+ l.mu.Unlock()
+ close(ch)
+ return ch, ErrKeepAliveHalted{Reason: err}
+ default:
+ }
+ ka, ok := l.keepAlives[id]
+ if !ok {
+ // create fresh keep alive
+ ka = &keepAlive{
+ chs: []chan<- *LeaseKeepAliveResponse{ch},
+ ctxs: []context.Context{ctx},
+ deadline: time.Now().Add(l.firstKeepAliveTimeout),
+ nextKeepAlive: time.Now(),
+ donec: make(chan struct{}),
+ }
+ l.keepAlives[id] = ka
+ } else {
+ // add channel and context to existing keep alive
+ ka.ctxs = append(ka.ctxs, ctx)
+ ka.chs = append(ka.chs, ch)
+ }
+ l.mu.Unlock()
+
+ go l.keepAliveCtxCloser(id, ctx, ka.donec)
+ l.firstKeepAliveOnce.Do(func() {
+ go l.recvKeepAliveLoop()
+ go l.deadlineLoop()
+ })
+
+ return ch, nil
+}
+
+func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
+ for {
+ resp, err := l.keepAliveOnce(ctx, id)
+ if err == nil {
+ if resp.TTL <= 0 {
+ err = rpctypes.ErrLeaseNotFound
+ }
+ return resp, err
+ }
+ if isHaltErr(ctx, err) {
+ return nil, toErr(ctx, err)
+ }
+ }
+}
+
+func (l *lessor) Close() error {
+ l.stopCancel()
+ // close for synchronous teardown if stream goroutines never launched
+ l.firstKeepAliveOnce.Do(func() { close(l.donec) })
+ <-l.donec
+ return nil
+}
+
+func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
+ select {
+ case <-donec:
+ return
+ case <-l.donec:
+ return
+ case <-ctx.Done():
+ }
+
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ ka, ok := l.keepAlives[id]
+ if !ok {
+ return
+ }
+
+ // close channel and remove context if still associated with keep alive
+ for i, c := range ka.ctxs {
+ if c == ctx {
+ close(ka.chs[i])
+ ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
+ ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
+ break
+ }
+ }
+ // remove if no one more listeners
+ if len(ka.chs) == 0 {
+ delete(l.keepAlives, id)
+ }
+}
+
+// closeRequireLeader scans keepAlives for ctxs that have require leader
+// and closes the associated channels.
+func (l *lessor) closeRequireLeader() {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ for _, ka := range l.keepAlives {
+ reqIdxs := 0
+ // find all required leader channels, close, mark as nil
+ for i, ctx := range ka.ctxs {
+ md, ok := metadata.FromOutgoingContext(ctx)
+ if !ok {
+ continue
+ }
+ ks := md[rpctypes.MetadataRequireLeaderKey]
+ if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
+ continue
+ }
+ close(ka.chs[i])
+ ka.chs[i] = nil
+ reqIdxs++
+ }
+ if reqIdxs == 0 {
+ continue
+ }
+ // remove all channels that required a leader from keepalive
+ newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
+ newCtxs := make([]context.Context, len(newChs))
+ newIdx := 0
+ for i := range ka.chs {
+ if ka.chs[i] == nil {
+ continue
+ }
+ newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
+ newIdx++
+ }
+ ka.chs, ka.ctxs = newChs, newCtxs
+ }
+}
+
+func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
+ cctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+
+ err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+
+ resp, rerr := stream.Recv()
+ if rerr != nil {
+ return nil, toErr(ctx, rerr)
+ }
+
+ karesp := &LeaseKeepAliveResponse{
+ ResponseHeader: resp.GetHeader(),
+ ID: LeaseID(resp.ID),
+ TTL: resp.TTL,
+ }
+ return karesp, nil
+}
+
+func (l *lessor) recvKeepAliveLoop() (gerr error) {
+ defer func() {
+ l.mu.Lock()
+ close(l.donec)
+ l.loopErr = gerr
+ for _, ka := range l.keepAlives {
+ ka.close()
+ }
+ l.keepAlives = make(map[LeaseID]*keepAlive)
+ l.mu.Unlock()
+ }()
+
+ for {
+ stream, err := l.resetRecv()
+ if err != nil {
+ if canceledByCaller(l.stopCtx, err) {
+ return err
+ }
+ } else {
+ for {
+ resp, err := stream.Recv()
+ if err != nil {
+ if canceledByCaller(l.stopCtx, err) {
+ return err
+ }
+
+ if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
+ l.closeRequireLeader()
+ }
+ break
+ }
+
+ l.recvKeepAlive(resp)
+ }
+ }
+
+ select {
+ case <-time.After(retryConnWait):
+ continue
+ case <-l.stopCtx.Done():
+ return l.stopCtx.Err()
+ }
+ }
+}
+
+// resetRecv opens a new lease stream and starts sending keep alive requests.
+func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
+ sctx, cancel := context.WithCancel(l.stopCtx)
+ stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
+ if err != nil {
+ cancel()
+ return nil, err
+ }
+
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ if l.stream != nil && l.streamCancel != nil {
+ l.streamCancel()
+ }
+
+ l.streamCancel = cancel
+ l.stream = stream
+
+ go l.sendKeepAliveLoop(stream)
+ return stream, nil
+}
+
+// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
+func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
+ karesp := &LeaseKeepAliveResponse{
+ ResponseHeader: resp.GetHeader(),
+ ID: LeaseID(resp.ID),
+ TTL: resp.TTL,
+ }
+
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ ka, ok := l.keepAlives[karesp.ID]
+ if !ok {
+ return
+ }
+
+ if karesp.TTL <= 0 {
+ // lease expired; close all keep alive channels
+ delete(l.keepAlives, karesp.ID)
+ ka.close()
+ return
+ }
+
+ // send update to all channels
+ nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
+ ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
+ for _, ch := range ka.chs {
+ select {
+ case ch <- karesp:
+ default:
+ }
+ // still advance in order to rate-limit keep-alive sends
+ ka.nextKeepAlive = nextKeepAlive
+ }
+}
+
+// deadlineLoop reaps any keep alive channels that have not received a response
+// within the lease TTL
+func (l *lessor) deadlineLoop() {
+ for {
+ select {
+ case <-time.After(time.Second):
+ case <-l.donec:
+ return
+ }
+ now := time.Now()
+ l.mu.Lock()
+ for id, ka := range l.keepAlives {
+ if ka.deadline.Before(now) {
+ // waited too long for response; lease may be expired
+ ka.close()
+ delete(l.keepAlives, id)
+ }
+ }
+ l.mu.Unlock()
+ }
+}
+
+// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
+func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
+ for {
+ var tosend []LeaseID
+
+ now := time.Now()
+ l.mu.Lock()
+ for id, ka := range l.keepAlives {
+ if ka.nextKeepAlive.Before(now) {
+ tosend = append(tosend, id)
+ }
+ }
+ l.mu.Unlock()
+
+ for _, id := range tosend {
+ r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
+ if err := stream.Send(r); err != nil {
+ // TODO do something with this error?
+ return
+ }
+ }
+
+ select {
+ case <-time.After(500 * time.Millisecond):
+ case <-stream.Context().Done():
+ return
+ case <-l.donec:
+ return
+ case <-l.stopCtx.Done():
+ return
+ }
+ }
+}
+
+func (ka *keepAlive) close() {
+ close(ka.donec)
+ for _, ch := range ka.chs {
+ close(ch)
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/logger.go b/vendor/go.etcd.io/etcd/clientv3/logger.go
new file mode 100644
index 0000000..3276372
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/logger.go
@@ -0,0 +1,101 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "io/ioutil"
+ "sync"
+
+ "github.com/coreos/etcd/pkg/logutil"
+
+ "google.golang.org/grpc/grpclog"
+)
+
+var (
+ lgMu sync.RWMutex
+ lg logutil.Logger
+)
+
+type settableLogger struct {
+ l grpclog.LoggerV2
+ mu sync.RWMutex
+}
+
+func init() {
+ // disable client side logs by default
+ lg = &settableLogger{}
+ SetLogger(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
+}
+
+// SetLogger sets client-side Logger.
+func SetLogger(l grpclog.LoggerV2) {
+ lgMu.Lock()
+ lg = logutil.NewLogger(l)
+ // override grpclog so that any changes happen with locking
+ grpclog.SetLoggerV2(lg)
+ lgMu.Unlock()
+}
+
+// GetLogger returns the current logutil.Logger.
+func GetLogger() logutil.Logger {
+ lgMu.RLock()
+ l := lg
+ lgMu.RUnlock()
+ return l
+}
+
+// NewLogger returns a new Logger with logutil.Logger.
+func NewLogger(gl grpclog.LoggerV2) logutil.Logger {
+ return &settableLogger{l: gl}
+}
+
+func (s *settableLogger) get() grpclog.LoggerV2 {
+ s.mu.RLock()
+ l := s.l
+ s.mu.RUnlock()
+ return l
+}
+
+// implement the grpclog.LoggerV2 interface
+
+func (s *settableLogger) Info(args ...interface{}) { s.get().Info(args...) }
+func (s *settableLogger) Infof(format string, args ...interface{}) { s.get().Infof(format, args...) }
+func (s *settableLogger) Infoln(args ...interface{}) { s.get().Infoln(args...) }
+func (s *settableLogger) Warning(args ...interface{}) { s.get().Warning(args...) }
+func (s *settableLogger) Warningf(format string, args ...interface{}) {
+ s.get().Warningf(format, args...)
+}
+func (s *settableLogger) Warningln(args ...interface{}) { s.get().Warningln(args...) }
+func (s *settableLogger) Error(args ...interface{}) { s.get().Error(args...) }
+func (s *settableLogger) Errorf(format string, args ...interface{}) {
+ s.get().Errorf(format, args...)
+}
+func (s *settableLogger) Errorln(args ...interface{}) { s.get().Errorln(args...) }
+func (s *settableLogger) Fatal(args ...interface{}) { s.get().Fatal(args...) }
+func (s *settableLogger) Fatalf(format string, args ...interface{}) { s.get().Fatalf(format, args...) }
+func (s *settableLogger) Fatalln(args ...interface{}) { s.get().Fatalln(args...) }
+func (s *settableLogger) Print(args ...interface{}) { s.get().Info(args...) }
+func (s *settableLogger) Printf(format string, args ...interface{}) { s.get().Infof(format, args...) }
+func (s *settableLogger) Println(args ...interface{}) { s.get().Infoln(args...) }
+func (s *settableLogger) V(l int) bool { return s.get().V(l) }
+func (s *settableLogger) Lvl(lvl int) grpclog.LoggerV2 {
+ s.mu.RLock()
+ l := s.l
+ s.mu.RUnlock()
+ if l.V(lvl) {
+ return s
+ }
+ return logutil.NewDiscardLogger()
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/maintenance.go b/vendor/go.etcd.io/etcd/clientv3/maintenance.go
new file mode 100644
index 0000000..5e87cf8
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/maintenance.go
@@ -0,0 +1,239 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+type (
+ DefragmentResponse pb.DefragmentResponse
+ AlarmResponse pb.AlarmResponse
+ AlarmMember pb.AlarmMember
+ StatusResponse pb.StatusResponse
+ HashKVResponse pb.HashKVResponse
+ MoveLeaderResponse pb.MoveLeaderResponse
+)
+
+type Maintenance interface {
+ // AlarmList gets all active alarms.
+ AlarmList(ctx context.Context) (*AlarmResponse, error)
+
+ // AlarmDisarm disarms a given alarm.
+ AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)
+
+ // Defragment releases wasted space from internal fragmentation on a given etcd member.
+ // Defragment is only needed when deleting a large number of keys and want to reclaim
+ // the resources.
+ // Defragment is an expensive operation. User should avoid defragmenting multiple members
+ // at the same time.
+ // To defragment multiple members in the cluster, user need to call defragment multiple
+ // times with different endpoints.
+ Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error)
+
+ // Status gets the status of the endpoint.
+ Status(ctx context.Context, endpoint string) (*StatusResponse, error)
+
+ // HashKV returns a hash of the KV state at the time of the RPC.
+ // If revision is zero, the hash is computed on all keys. If the revision
+ // is non-zero, the hash is computed on all keys at or below the given revision.
+ HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
+
+ // Snapshot provides a reader for a point-in-time snapshot of etcd.
+ // If the context "ctx" is canceled or timed out, reading from returned
+ // "io.ReadCloser" would error out (e.g. context.Canceled, context.DeadlineExceeded).
+ Snapshot(ctx context.Context) (io.ReadCloser, error)
+
+ // MoveLeader requests current leader to transfer its leadership to the transferee.
+ // Request must be made to the leader.
+ MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
+}
+
+type maintenance struct {
+ dial func(endpoint string) (pb.MaintenanceClient, func(), error)
+ remote pb.MaintenanceClient
+ callOpts []grpc.CallOption
+}
+
+func NewMaintenance(c *Client) Maintenance {
+ api := &maintenance{
+ dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
+ conn, err := c.Dial(endpoint)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %v", endpoint, err)
+ }
+ cancel := func() { conn.Close() }
+ return RetryMaintenanceClient(c, conn), cancel, nil
+ },
+ remote: RetryMaintenanceClient(c, c.conn),
+ }
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
+ api := &maintenance{
+ dial: func(string) (pb.MaintenanceClient, func(), error) {
+ return remote, func() {}, nil
+ },
+ remote: remote,
+ }
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
+ req := &pb.AlarmRequest{
+ Action: pb.AlarmRequest_GET,
+ MemberID: 0, // all
+ Alarm: pb.AlarmType_NONE, // all
+ }
+ resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
+ if err == nil {
+ return (*AlarmResponse)(resp), nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
+ req := &pb.AlarmRequest{
+ Action: pb.AlarmRequest_DEACTIVATE,
+ MemberID: am.MemberID,
+ Alarm: am.Alarm,
+ }
+
+ if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
+ ar, err := m.AlarmList(ctx)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ ret := AlarmResponse{}
+ for _, am := range ar.Alarms {
+ dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
+ if derr != nil {
+ return nil, toErr(ctx, derr)
+ }
+ ret.Alarms = append(ret.Alarms, dresp.Alarms...)
+ }
+ return &ret, nil
+ }
+
+ resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
+ if err == nil {
+ return (*AlarmResponse)(resp), nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
+ remote, cancel, err := m.dial(endpoint)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ defer cancel()
+ resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*DefragmentResponse)(resp), nil
+}
+
+func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
+ remote, cancel, err := m.dial(endpoint)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ defer cancel()
+ resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*StatusResponse)(resp), nil
+}
+
+func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
+ remote, cancel, err := m.dial(endpoint)
+ if err != nil {
+
+ return nil, toErr(ctx, err)
+ }
+ defer cancel()
+ resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*HashKVResponse)(resp), nil
+}
+
+func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
+ ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+
+ plog.Info("opened snapshot stream; downloading")
+ pr, pw := io.Pipe()
+ go func() {
+ for {
+ resp, err := ss.Recv()
+ if err != nil {
+ switch err {
+ case io.EOF:
+ plog.Info("completed snapshot read; closing")
+ default:
+ plog.Warningf("failed to receive from snapshot stream; closing (%v)", err)
+ }
+ pw.CloseWithError(err)
+ return
+ }
+
+ // can "resp == nil && err == nil"
+ // before we receive snapshot SHA digest?
+ // No, server sends EOF with an empty response
+ // after it sends SHA digest at the end
+
+ if _, werr := pw.Write(resp.Blob); werr != nil {
+ pw.CloseWithError(werr)
+ return
+ }
+ }
+ }()
+ return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
+}
+
+type snapshotReadCloser struct {
+ ctx context.Context
+ io.ReadCloser
+}
+
+func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
+ n, err = rc.ReadCloser.Read(p)
+ return n, toErr(rc.ctx, err)
+}
+
+func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
+ resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
+ return (*MoveLeaderResponse)(resp), toErr(ctx, err)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/op.go b/vendor/go.etcd.io/etcd/clientv3/op.go
new file mode 100644
index 0000000..3dca41b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/op.go
@@ -0,0 +1,530 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+type opType int
+
+const (
+ // A default Op has opType 0, which is invalid.
+ tRange opType = iota + 1
+ tPut
+ tDeleteRange
+ tTxn
+)
+
+var (
+ noPrefixEnd = []byte{0}
+)
+
+// Op represents an Operation that kv can execute.
+type Op struct {
+ t opType
+ key []byte
+ end []byte
+
+ // for range
+ limit int64
+ sort *SortOption
+ serializable bool
+ keysOnly bool
+ countOnly bool
+ minModRev int64
+ maxModRev int64
+ minCreateRev int64
+ maxCreateRev int64
+
+ // for range, watch
+ rev int64
+
+ // for watch, put, delete
+ prevKV bool
+
+ // for watch
+ // fragmentation should be disabled by default
+ // if true, split watch events when total exceeds
+ // "--max-request-bytes" flag value + 512-byte
+ fragment bool
+
+ // for put
+ ignoreValue bool
+ ignoreLease bool
+
+ // progressNotify is for progress updates.
+ progressNotify bool
+ // createdNotify is for created event
+ createdNotify bool
+ // filters for watchers
+ filterPut bool
+ filterDelete bool
+
+ // for put
+ val []byte
+ leaseID LeaseID
+
+ // txn
+ cmps []Cmp
+ thenOps []Op
+ elseOps []Op
+}
+
+// accessors / mutators
+
+func (op Op) IsTxn() bool { return op.t == tTxn }
+func (op Op) Txn() ([]Cmp, []Op, []Op) { return op.cmps, op.thenOps, op.elseOps }
+
+// KeyBytes returns the byte slice holding the Op's key.
+func (op Op) KeyBytes() []byte { return op.key }
+
+// WithKeyBytes sets the byte slice for the Op's key.
+func (op *Op) WithKeyBytes(key []byte) { op.key = key }
+
+// RangeBytes returns the byte slice holding with the Op's range end, if any.
+func (op Op) RangeBytes() []byte { return op.end }
+
+// Rev returns the requested revision, if any.
+func (op Op) Rev() int64 { return op.rev }
+
+// IsPut returns true iff the operation is a Put.
+func (op Op) IsPut() bool { return op.t == tPut }
+
+// IsGet returns true iff the operation is a Get.
+func (op Op) IsGet() bool { return op.t == tRange }
+
+// IsDelete returns true iff the operation is a Delete.
+func (op Op) IsDelete() bool { return op.t == tDeleteRange }
+
+// IsSerializable returns true if the serializable field is true.
+func (op Op) IsSerializable() bool { return op.serializable == true }
+
+// IsKeysOnly returns whether keysOnly is set.
+func (op Op) IsKeysOnly() bool { return op.keysOnly == true }
+
+// IsCountOnly returns whether countOnly is set.
+func (op Op) IsCountOnly() bool { return op.countOnly == true }
+
+// MinModRev returns the operation's minimum modify revision.
+func (op Op) MinModRev() int64 { return op.minModRev }
+
+// MaxModRev returns the operation's maximum modify revision.
+func (op Op) MaxModRev() int64 { return op.maxModRev }
+
+// MinCreateRev returns the operation's minimum create revision.
+func (op Op) MinCreateRev() int64 { return op.minCreateRev }
+
+// MaxCreateRev returns the operation's maximum create revision.
+func (op Op) MaxCreateRev() int64 { return op.maxCreateRev }
+
+// WithRangeBytes sets the byte slice for the Op's range end.
+func (op *Op) WithRangeBytes(end []byte) { op.end = end }
+
+// ValueBytes returns the byte slice holding the Op's value, if any.
+func (op Op) ValueBytes() []byte { return op.val }
+
+// WithValueBytes sets the byte slice for the Op's value.
+func (op *Op) WithValueBytes(v []byte) { op.val = v }
+
+func (op Op) toRangeRequest() *pb.RangeRequest {
+ if op.t != tRange {
+ panic("op.t != tRange")
+ }
+ r := &pb.RangeRequest{
+ Key: op.key,
+ RangeEnd: op.end,
+ Limit: op.limit,
+ Revision: op.rev,
+ Serializable: op.serializable,
+ KeysOnly: op.keysOnly,
+ CountOnly: op.countOnly,
+ MinModRevision: op.minModRev,
+ MaxModRevision: op.maxModRev,
+ MinCreateRevision: op.minCreateRev,
+ MaxCreateRevision: op.maxCreateRev,
+ }
+ if op.sort != nil {
+ r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
+ r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
+ }
+ return r
+}
+
+func (op Op) toTxnRequest() *pb.TxnRequest {
+ thenOps := make([]*pb.RequestOp, len(op.thenOps))
+ for i, tOp := range op.thenOps {
+ thenOps[i] = tOp.toRequestOp()
+ }
+ elseOps := make([]*pb.RequestOp, len(op.elseOps))
+ for i, eOp := range op.elseOps {
+ elseOps[i] = eOp.toRequestOp()
+ }
+ cmps := make([]*pb.Compare, len(op.cmps))
+ for i := range op.cmps {
+ cmps[i] = (*pb.Compare)(&op.cmps[i])
+ }
+ return &pb.TxnRequest{Compare: cmps, Success: thenOps, Failure: elseOps}
+}
+
+func (op Op) toRequestOp() *pb.RequestOp {
+ switch op.t {
+ case tRange:
+ return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: op.toRangeRequest()}}
+ case tPut:
+ r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
+ return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
+ case tDeleteRange:
+ r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
+ return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
+ case tTxn:
+ return &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: op.toTxnRequest()}}
+ default:
+ panic("Unknown Op")
+ }
+}
+
+func (op Op) isWrite() bool {
+ if op.t == tTxn {
+ for _, tOp := range op.thenOps {
+ if tOp.isWrite() {
+ return true
+ }
+ }
+ for _, tOp := range op.elseOps {
+ if tOp.isWrite() {
+ return true
+ }
+ }
+ return false
+ }
+ return op.t != tRange
+}
+
+func OpGet(key string, opts ...OpOption) Op {
+ ret := Op{t: tRange, key: []byte(key)}
+ ret.applyOpts(opts)
+ return ret
+}
+
+func OpDelete(key string, opts ...OpOption) Op {
+ ret := Op{t: tDeleteRange, key: []byte(key)}
+ ret.applyOpts(opts)
+ switch {
+ case ret.leaseID != 0:
+ panic("unexpected lease in delete")
+ case ret.limit != 0:
+ panic("unexpected limit in delete")
+ case ret.rev != 0:
+ panic("unexpected revision in delete")
+ case ret.sort != nil:
+ panic("unexpected sort in delete")
+ case ret.serializable:
+ panic("unexpected serializable in delete")
+ case ret.countOnly:
+ panic("unexpected countOnly in delete")
+ case ret.minModRev != 0, ret.maxModRev != 0:
+ panic("unexpected mod revision filter in delete")
+ case ret.minCreateRev != 0, ret.maxCreateRev != 0:
+ panic("unexpected create revision filter in delete")
+ case ret.filterDelete, ret.filterPut:
+ panic("unexpected filter in delete")
+ case ret.createdNotify:
+ panic("unexpected createdNotify in delete")
+ }
+ return ret
+}
+
+func OpPut(key, val string, opts ...OpOption) Op {
+ ret := Op{t: tPut, key: []byte(key), val: []byte(val)}
+ ret.applyOpts(opts)
+ switch {
+ case ret.end != nil:
+ panic("unexpected range in put")
+ case ret.limit != 0:
+ panic("unexpected limit in put")
+ case ret.rev != 0:
+ panic("unexpected revision in put")
+ case ret.sort != nil:
+ panic("unexpected sort in put")
+ case ret.serializable:
+ panic("unexpected serializable in put")
+ case ret.countOnly:
+ panic("unexpected countOnly in put")
+ case ret.minModRev != 0, ret.maxModRev != 0:
+ panic("unexpected mod revision filter in put")
+ case ret.minCreateRev != 0, ret.maxCreateRev != 0:
+ panic("unexpected create revision filter in put")
+ case ret.filterDelete, ret.filterPut:
+ panic("unexpected filter in put")
+ case ret.createdNotify:
+ panic("unexpected createdNotify in put")
+ }
+ return ret
+}
+
+func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op {
+ return Op{t: tTxn, cmps: cmps, thenOps: thenOps, elseOps: elseOps}
+}
+
+func opWatch(key string, opts ...OpOption) Op {
+ ret := Op{t: tRange, key: []byte(key)}
+ ret.applyOpts(opts)
+ switch {
+ case ret.leaseID != 0:
+ panic("unexpected lease in watch")
+ case ret.limit != 0:
+ panic("unexpected limit in watch")
+ case ret.sort != nil:
+ panic("unexpected sort in watch")
+ case ret.serializable:
+ panic("unexpected serializable in watch")
+ case ret.countOnly:
+ panic("unexpected countOnly in watch")
+ case ret.minModRev != 0, ret.maxModRev != 0:
+ panic("unexpected mod revision filter in watch")
+ case ret.minCreateRev != 0, ret.maxCreateRev != 0:
+ panic("unexpected create revision filter in watch")
+ }
+ return ret
+}
+
+func (op *Op) applyOpts(opts []OpOption) {
+ for _, opt := range opts {
+ opt(op)
+ }
+}
+
+// OpOption configures Operations like Get, Put, Delete.
+type OpOption func(*Op)
+
+// WithLease attaches a lease ID to a key in 'Put' request.
+func WithLease(leaseID LeaseID) OpOption {
+ return func(op *Op) { op.leaseID = leaseID }
+}
+
+// WithLimit limits the number of results to return from 'Get' request.
+// If WithLimit is given a 0 limit, it is treated as no limit.
+func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } }
+
+// WithRev specifies the store revision for 'Get' request.
+// Or the start revision of 'Watch' request.
+func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } }
+
+// WithSort specifies the ordering in 'Get' request. It requires
+// 'WithRange' and/or 'WithPrefix' to be specified too.
+// 'target' specifies the target to sort by: key, version, revisions, value.
+// 'order' can be either 'SortNone', 'SortAscend', 'SortDescend'.
+func WithSort(target SortTarget, order SortOrder) OpOption {
+ return func(op *Op) {
+ if target == SortByKey && order == SortAscend {
+ // If order != SortNone, server fetches the entire key-space,
+ // and then applies the sort and limit, if provided.
+ // Since by default the server returns results sorted by keys
+ // in lexicographically ascending order, the client should ignore
+ // SortOrder if the target is SortByKey.
+ order = SortNone
+ }
+ op.sort = &SortOption{target, order}
+ }
+}
+
+// GetPrefixRangeEnd gets the range end of the prefix.
+// 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
+func GetPrefixRangeEnd(prefix string) string {
+ return string(getPrefix([]byte(prefix)))
+}
+
+func getPrefix(key []byte) []byte {
+ end := make([]byte, len(key))
+ copy(end, key)
+ for i := len(end) - 1; i >= 0; i-- {
+ if end[i] < 0xff {
+ end[i] = end[i] + 1
+ end = end[:i+1]
+ return end
+ }
+ }
+ // next prefix does not exist (e.g., 0xffff);
+ // default to WithFromKey policy
+ return noPrefixEnd
+}
+
+// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
+// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
+// can return 'foo1', 'foo2', and so on.
+func WithPrefix() OpOption {
+ return func(op *Op) {
+ if len(op.key) == 0 {
+ op.key, op.end = []byte{0}, []byte{0}
+ return
+ }
+ op.end = getPrefix(op.key)
+ }
+}
+
+// WithRange specifies the range of 'Get', 'Delete', 'Watch' requests.
+// For example, 'Get' requests with 'WithRange(end)' returns
+// the keys in the range [key, end).
+// endKey must be lexicographically greater than start key.
+func WithRange(endKey string) OpOption {
+ return func(op *Op) { op.end = []byte(endKey) }
+}
+
+// WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests
+// to be equal or greater than the key in the argument.
+func WithFromKey() OpOption { return WithRange("\x00") }
+
+// WithSerializable makes 'Get' request serializable. By default,
+// it's linearizable. Serializable requests are better for lower latency
+// requirement.
+func WithSerializable() OpOption {
+ return func(op *Op) { op.serializable = true }
+}
+
+// WithKeysOnly makes the 'Get' request return only the keys and the corresponding
+// values will be omitted.
+func WithKeysOnly() OpOption {
+ return func(op *Op) { op.keysOnly = true }
+}
+
+// WithCountOnly makes the 'Get' request return only the count of keys.
+func WithCountOnly() OpOption {
+ return func(op *Op) { op.countOnly = true }
+}
+
+// WithMinModRev filters out keys for Get with modification revisions less than the given revision.
+func WithMinModRev(rev int64) OpOption { return func(op *Op) { op.minModRev = rev } }
+
+// WithMaxModRev filters out keys for Get with modification revisions greater than the given revision.
+func WithMaxModRev(rev int64) OpOption { return func(op *Op) { op.maxModRev = rev } }
+
+// WithMinCreateRev filters out keys for Get with creation revisions less than the given revision.
+func WithMinCreateRev(rev int64) OpOption { return func(op *Op) { op.minCreateRev = rev } }
+
+// WithMaxCreateRev filters out keys for Get with creation revisions greater than the given revision.
+func WithMaxCreateRev(rev int64) OpOption { return func(op *Op) { op.maxCreateRev = rev } }
+
+// WithFirstCreate gets the key with the oldest creation revision in the request range.
+func WithFirstCreate() []OpOption { return withTop(SortByCreateRevision, SortAscend) }
+
+// WithLastCreate gets the key with the latest creation revision in the request range.
+func WithLastCreate() []OpOption { return withTop(SortByCreateRevision, SortDescend) }
+
+// WithFirstKey gets the lexically first key in the request range.
+func WithFirstKey() []OpOption { return withTop(SortByKey, SortAscend) }
+
+// WithLastKey gets the lexically last key in the request range.
+func WithLastKey() []OpOption { return withTop(SortByKey, SortDescend) }
+
+// WithFirstRev gets the key with the oldest modification revision in the request range.
+func WithFirstRev() []OpOption { return withTop(SortByModRevision, SortAscend) }
+
+// WithLastRev gets the key with the latest modification revision in the request range.
+func WithLastRev() []OpOption { return withTop(SortByModRevision, SortDescend) }
+
+// withTop gets the first key over the get's prefix given a sort order
+func withTop(target SortTarget, order SortOrder) []OpOption {
+ return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
+}
+
+// WithProgressNotify makes watch server send periodic progress updates
+// every 10 minutes when there is no incoming events.
+// Progress updates have zero events in WatchResponse.
+func WithProgressNotify() OpOption {
+ return func(op *Op) {
+ op.progressNotify = true
+ }
+}
+
+// WithCreatedNotify makes watch server sends the created event.
+func WithCreatedNotify() OpOption {
+ return func(op *Op) {
+ op.createdNotify = true
+ }
+}
+
+// WithFilterPut discards PUT events from the watcher.
+func WithFilterPut() OpOption {
+ return func(op *Op) { op.filterPut = true }
+}
+
+// WithFilterDelete discards DELETE events from the watcher.
+func WithFilterDelete() OpOption {
+ return func(op *Op) { op.filterDelete = true }
+}
+
+// WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
+// nothing will be returned.
+func WithPrevKV() OpOption {
+ return func(op *Op) {
+ op.prevKV = true
+ }
+}
+
+// WithIgnoreValue updates the key using its current value.
+// This option can not be combined with non-empty values.
+// Returns an error if the key does not exist.
+func WithIgnoreValue() OpOption {
+ return func(op *Op) {
+ op.ignoreValue = true
+ }
+}
+
+// WithIgnoreLease updates the key using its current lease.
+// This option can not be combined with WithLease.
+// Returns an error if the key does not exist.
+func WithIgnoreLease() OpOption {
+ return func(op *Op) {
+ op.ignoreLease = true
+ }
+}
+
+// LeaseOp represents an Operation that lease can execute.
+type LeaseOp struct {
+ id LeaseID
+
+ // for TimeToLive
+ attachedKeys bool
+}
+
+// LeaseOption configures lease operations.
+type LeaseOption func(*LeaseOp)
+
+func (op *LeaseOp) applyOpts(opts []LeaseOption) {
+ for _, opt := range opts {
+ opt(op)
+ }
+}
+
+// WithAttachedKeys makes TimeToLive list the keys attached to the given lease ID.
+func WithAttachedKeys() LeaseOption {
+ return func(op *LeaseOp) { op.attachedKeys = true }
+}
+
+func toLeaseTimeToLiveRequest(id LeaseID, opts ...LeaseOption) *pb.LeaseTimeToLiveRequest {
+ ret := &LeaseOp{id: id}
+ ret.applyOpts(opts)
+ return &pb.LeaseTimeToLiveRequest{ID: int64(id), Keys: ret.attachedKeys}
+}
+
+// WithFragment to receive raw watch response with fragmentation.
+// Fragmentation is disabled by default. If fragmentation is enabled,
+// etcd watch server will split watch response before sending to clients
+// when the total size of watch events exceed server-side request limit.
+// The default server-side request limit is 1.5 MiB, which can be configured
+// as "--max-request-bytes" flag value + gRPC-overhead 512 bytes.
+// See "etcdserver/api/v3rpc/watch.go" for more details.
+func WithFragment() OpOption {
+ return func(op *Op) { op.fragment = true }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/options.go b/vendor/go.etcd.io/etcd/clientv3/options.go
new file mode 100644
index 0000000..700714c
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/options.go
@@ -0,0 +1,65 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "math"
+ "time"
+
+ "google.golang.org/grpc"
+)
+
+var (
+ // client-side handling retrying of request failures where data was not written to the wire or
+ // where server indicates it did not process the data. gRPC default is default is "FailFast(true)"
+ // but for etcd we default to "FailFast(false)" to minimize client request error responses due to
+ // transient failures.
+ defaultFailFast = grpc.FailFast(false)
+
+ // client-side request send limit, gRPC default is math.MaxInt32
+ // Make sure that "client-side send limit < server-side default send/recv limit"
+ // Same value as "embed.DefaultMaxRequestBytes" plus gRPC overhead bytes
+ defaultMaxCallSendMsgSize = grpc.MaxCallSendMsgSize(2 * 1024 * 1024)
+
+ // client-side response receive limit, gRPC default is 4MB
+ // Make sure that "client-side receive limit >= server-side default send/recv limit"
+ // because range response can easily exceed request send limits
+ // Default to math.MaxInt32; writes exceeding server-side send limit fails anyway
+ defaultMaxCallRecvMsgSize = grpc.MaxCallRecvMsgSize(math.MaxInt32)
+
+ // client-side non-streaming retry limit, only applied to requests where server responds with
+ // a error code clearly indicating it was unable to process the request such as codes.Unavailable.
+ // If set to 0, retry is disabled.
+ defaultUnaryMaxRetries uint = 100
+
+ // client-side streaming retry limit, only applied to requests where server responds with
+ // a error code clearly indicating it was unable to process the request such as codes.Unavailable.
+ // If set to 0, retry is disabled.
+ defaultStreamMaxRetries = ^uint(0) // max uint
+
+ // client-side retry backoff wait between requests.
+ defaultBackoffWaitBetween = 25 * time.Millisecond
+
+ // client-side retry backoff default jitter fraction.
+ defaultBackoffJitterFraction = 0.10
+)
+
+// defaultCallOpts defines a list of default "gRPC.CallOption".
+// Some options are exposed to "clientv3.Config".
+// Defaults will be overridden by the settings in "clientv3.Config".
+var defaultCallOpts = []grpc.CallOption{defaultFailFast, defaultMaxCallSendMsgSize, defaultMaxCallRecvMsgSize}
+
+// MaxLeaseTTL is the maximum lease TTL value
+const MaxLeaseTTL = 9000000000
diff --git a/vendor/go.etcd.io/etcd/clientv3/retry.go b/vendor/go.etcd.io/etcd/clientv3/retry.go
new file mode 100644
index 0000000..6baa52e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/retry.go
@@ -0,0 +1,294 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type retryPolicy uint8
+
+const (
+ repeatable retryPolicy = iota
+ nonRepeatable
+)
+
+func (rp retryPolicy) String() string {
+ switch rp {
+ case repeatable:
+ return "repeatable"
+ case nonRepeatable:
+ return "nonRepeatable"
+ default:
+ return "UNKNOWN"
+ }
+}
+
+// isSafeRetryImmutableRPC returns "true" when an immutable request is safe for retry.
+//
+// immutable requests (e.g. Get) should be retried unless it's
+// an obvious server-side error (e.g. rpctypes.ErrRequestTooLarge).
+//
+// Returning "false" means retry should stop, since client cannot
+// handle itself even with retries.
+func isSafeRetryImmutableRPC(err error) bool {
+ eErr := rpctypes.Error(err)
+ if serverErr, ok := eErr.(rpctypes.EtcdError); ok && serverErr.Code() != codes.Unavailable {
+ // interrupted by non-transient server-side or gRPC-side error
+ // client cannot handle itself (e.g. rpctypes.ErrCompacted)
+ return false
+ }
+ // only retry if unavailable
+ ev, ok := status.FromError(err)
+ if !ok {
+ // all errors from RPC is typed "grpc/status.(*statusError)"
+ // (ref. https://github.com/grpc/grpc-go/pull/1782)
+ //
+ // if the error type is not "grpc/status.(*statusError)",
+ // it could be from "Dial"
+ // TODO: do not retry for now
+ // ref. https://github.com/grpc/grpc-go/issues/1581
+ return false
+ }
+ return ev.Code() == codes.Unavailable
+}
+
+// isSafeRetryMutableRPC returns "true" when a mutable request is safe for retry.
+//
+// mutable requests (e.g. Put, Delete, Txn) should only be retried
+// when the status code is codes.Unavailable when initial connection
+// has not been established (no endpoint is up).
+//
+// Returning "false" means retry should stop, otherwise it violates
+// write-at-most-once semantics.
+func isSafeRetryMutableRPC(err error) bool {
+ if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable {
+ // not safe for mutable RPCs
+ // e.g. interrupted by non-transient error that client cannot handle itself,
+ // or transient error while the connection has already been established
+ return false
+ }
+ desc := rpctypes.ErrorDesc(err)
+ return desc == "there is no address available" || desc == "there is no connection available"
+}
+
+type retryKVClient struct {
+ kc pb.KVClient
+}
+
+// RetryKVClient implements a KVClient.
+func RetryKVClient(c *Client) pb.KVClient {
+ return &retryKVClient{
+ kc: pb.NewKVClient(c.conn),
+ }
+}
+func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
+ return rkv.kc.Range(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
+ return rkv.kc.Put(ctx, in, opts...)
+}
+
+func (rkv *retryKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
+ return rkv.kc.DeleteRange(ctx, in, opts...)
+}
+
+func (rkv *retryKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
+ return rkv.kc.Txn(ctx, in, opts...)
+}
+
+func (rkv *retryKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
+ return rkv.kc.Compact(ctx, in, opts...)
+}
+
+type retryLeaseClient struct {
+ lc pb.LeaseClient
+}
+
+// RetryLeaseClient implements a LeaseClient.
+func RetryLeaseClient(c *Client) pb.LeaseClient {
+ return &retryLeaseClient{
+ lc: pb.NewLeaseClient(c.conn),
+ }
+}
+
+func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) {
+ return rlc.lc.LeaseTimeToLive(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) {
+ return rlc.lc.LeaseLeases(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
+ return rlc.lc.LeaseGrant(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) {
+ return rlc.lc.LeaseRevoke(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) {
+ return rlc.lc.LeaseKeepAlive(ctx, append(opts, withRetryPolicy(repeatable))...)
+}
+
+type retryClusterClient struct {
+ cc pb.ClusterClient
+}
+
+// RetryClusterClient implements a ClusterClient.
+func RetryClusterClient(c *Client) pb.ClusterClient {
+ return &retryClusterClient{
+ cc: pb.NewClusterClient(c.conn),
+ }
+}
+
+func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) {
+ return rcc.cc.MemberList(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
+ return rcc.cc.MemberAdd(ctx, in, opts...)
+}
+
+func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) {
+ return rcc.cc.MemberRemove(ctx, in, opts...)
+}
+
+func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) {
+ return rcc.cc.MemberUpdate(ctx, in, opts...)
+}
+
+type retryMaintenanceClient struct {
+ mc pb.MaintenanceClient
+}
+
+// RetryMaintenanceClient implements a Maintenance.
+func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient {
+ return &retryMaintenanceClient{
+ mc: pb.NewMaintenanceClient(conn),
+ }
+}
+
+func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) {
+ return rmc.mc.Alarm(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) {
+ return rmc.mc.Status(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, opts ...grpc.CallOption) (resp *pb.HashResponse, err error) {
+ return rmc.mc.Hash(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequest, opts ...grpc.CallOption) (resp *pb.HashKVResponse, err error) {
+ return rmc.mc.HashKV(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (stream pb.Maintenance_SnapshotClient, err error) {
+ return rmc.mc.Snapshot(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rmc *retryMaintenanceClient) MoveLeader(ctx context.Context, in *pb.MoveLeaderRequest, opts ...grpc.CallOption) (resp *pb.MoveLeaderResponse, err error) {
+ return rmc.mc.MoveLeader(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rmc *retryMaintenanceClient) Defragment(ctx context.Context, in *pb.DefragmentRequest, opts ...grpc.CallOption) (resp *pb.DefragmentResponse, err error) {
+ return rmc.mc.Defragment(ctx, in, opts...)
+}
+
+type retryAuthClient struct {
+ ac pb.AuthClient
+}
+
+// RetryAuthClient implements a AuthClient.
+func RetryAuthClient(c *Client) pb.AuthClient {
+ return &retryAuthClient{
+ ac: pb.NewAuthClient(c.conn),
+ }
+}
+
+func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (resp *pb.AuthUserListResponse, err error) {
+ return rac.ac.UserList(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rac *retryAuthClient) UserGet(ctx context.Context, in *pb.AuthUserGetRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGetResponse, err error) {
+ return rac.ac.UserGet(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rac *retryAuthClient) RoleGet(ctx context.Context, in *pb.AuthRoleGetRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGetResponse, err error) {
+ return rac.ac.RoleGet(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rac *retryAuthClient) RoleList(ctx context.Context, in *pb.AuthRoleListRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleListResponse, err error) {
+ return rac.ac.RoleList(ctx, in, append(opts, withRetryPolicy(repeatable))...)
+}
+
+func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
+ return rac.ac.AuthEnable(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) {
+ return rac.ac.AuthDisable(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) {
+ return rac.ac.UserAdd(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) {
+ return rac.ac.UserDelete(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) {
+ return rac.ac.UserChangePassword(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) {
+ return rac.ac.UserGrantRole(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) {
+ return rac.ac.UserRevokeRole(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) {
+ return rac.ac.RoleAdd(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) {
+ return rac.ac.RoleDelete(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) {
+ return rac.ac.RoleGrantPermission(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) {
+ return rac.ac.RoleRevokePermission(ctx, in, opts...)
+}
+
+func (rac *retryAuthClient) Authenticate(ctx context.Context, in *pb.AuthenticateRequest, opts ...grpc.CallOption) (resp *pb.AuthenticateResponse, err error) {
+ return rac.ac.Authenticate(ctx, in, opts...)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go b/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go
new file mode 100644
index 0000000..f3c5057
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go
@@ -0,0 +1,392 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Based on github.com/grpc-ecosystem/go-grpc-middleware/retry, but modified to support the more
+// fine grained error checking required by write-at-most-once retry semantics of etcd.
+
+package clientv3
+
+import (
+ "context"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ "go.uber.org/zap"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+// unaryClientInterceptor returns a new retrying unary client interceptor.
+//
+// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
+// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
+func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor {
+ intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
+ return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ ctx = withVersion(ctx)
+ grpcOpts, retryOpts := filterCallOptions(opts)
+ callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
+ // short circuit for simplicity, and avoiding allocations.
+ if callOpts.max == 0 {
+ return invoker(ctx, method, req, reply, cc, grpcOpts...)
+ }
+ var lastErr error
+ for attempt := uint(0); attempt < callOpts.max; attempt++ {
+ if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
+ return err
+ }
+ logger.Debug(
+ "retrying of unary invoker",
+ zap.String("target", cc.Target()),
+ zap.Uint("attempt", attempt),
+ )
+ lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
+ if lastErr == nil {
+ return nil
+ }
+ logger.Warn(
+ "retrying of unary invoker failed",
+ zap.String("target", cc.Target()),
+ zap.Uint("attempt", attempt),
+ zap.Error(lastErr),
+ )
+ if isContextError(lastErr) {
+ if ctx.Err() != nil {
+ // its the context deadline or cancellation.
+ return lastErr
+ }
+ // its the callCtx deadline or cancellation, in which case try again.
+ continue
+ }
+ if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
+ gterr := c.getToken(ctx)
+ if gterr != nil {
+ logger.Warn(
+ "retrying of unary invoker failed to fetch new auth token",
+ zap.String("target", cc.Target()),
+ zap.Error(gterr),
+ )
+ return gterr // lastErr must be invalid auth token
+ }
+ continue
+ }
+ if !isSafeRetry(c.lg, lastErr, callOpts) {
+ return lastErr
+ }
+ }
+ return lastErr
+ }
+}
+
+// streamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls.
+//
+// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
+// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
+//
+// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs
+// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams,
+// BidiStreams), the retry interceptor will fail the call.
+func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor {
+ intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
+ return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ ctx = withVersion(ctx)
+ grpcOpts, retryOpts := filterCallOptions(opts)
+ callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
+ // short circuit for simplicity, and avoiding allocations.
+ if callOpts.max == 0 {
+ return streamer(ctx, desc, cc, method, grpcOpts...)
+ }
+ if desc.ClientStreams {
+ return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()")
+ }
+ newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
+ if err != nil {
+ logger.Error("streamer failed to create ClientStream", zap.Error(err))
+ return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
+ }
+ retryingStreamer := &serverStreamingRetryingStream{
+ client: c,
+ ClientStream: newStreamer,
+ callOpts: callOpts,
+ ctx: ctx,
+ streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
+ return streamer(ctx, desc, cc, method, grpcOpts...)
+ },
+ }
+ return retryingStreamer, nil
+ }
+}
+
+// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
+// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
+// a new ClientStream according to the retry policy.
+type serverStreamingRetryingStream struct {
+ grpc.ClientStream
+ client *Client
+ bufferedSends []interface{} // single message that the client can sen
+ receivedGood bool // indicates whether any prior receives were successful
+ wasClosedSend bool // indicates that CloseSend was closed
+ ctx context.Context
+ callOpts *options
+ streamerCall func(ctx context.Context) (grpc.ClientStream, error)
+ mu sync.RWMutex
+}
+
+func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) {
+ s.mu.Lock()
+ s.ClientStream = clientStream
+ s.mu.Unlock()
+}
+
+func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return s.ClientStream
+}
+
+func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error {
+ s.mu.Lock()
+ s.bufferedSends = append(s.bufferedSends, m)
+ s.mu.Unlock()
+ return s.getStream().SendMsg(m)
+}
+
+func (s *serverStreamingRetryingStream) CloseSend() error {
+ s.mu.Lock()
+ s.wasClosedSend = true
+ s.mu.Unlock()
+ return s.getStream().CloseSend()
+}
+
+func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) {
+ return s.getStream().Header()
+}
+
+func (s *serverStreamingRetryingStream) Trailer() metadata.MD {
+ return s.getStream().Trailer()
+}
+
+func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
+ attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m)
+ if !attemptRetry {
+ return lastErr // success or hard failure
+ }
+
+ // We start off from attempt 1, because zeroth was already made on normal SendMsg().
+ for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
+ if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil {
+ return err
+ }
+ newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
+ if err != nil {
+ s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err))
+ return err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
+ }
+ s.setStream(newStream)
+
+ s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr))
+ attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
+ if !attemptRetry {
+ return lastErr
+ }
+ }
+ return lastErr
+}
+
+func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) {
+ s.mu.RLock()
+ wasGood := s.receivedGood
+ s.mu.RUnlock()
+ err := s.getStream().RecvMsg(m)
+ if err == nil || err == io.EOF {
+ s.mu.Lock()
+ s.receivedGood = true
+ s.mu.Unlock()
+ return false, err
+ } else if wasGood {
+ // previous RecvMsg in the stream succeeded, no retry logic should interfere
+ return false, err
+ }
+ if isContextError(err) {
+ if s.ctx.Err() != nil {
+ return false, err
+ }
+ // its the callCtx deadline or cancellation, in which case try again.
+ return true, err
+ }
+ if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
+ gterr := s.client.getToken(s.ctx)
+ if gterr != nil {
+ s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr))
+ return false, err // return the original error for simplicity
+ }
+ return true, err
+
+ }
+ return isSafeRetry(s.client.lg, err, s.callOpts), err
+}
+
+func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) {
+ s.mu.RLock()
+ bufferedSends := s.bufferedSends
+ s.mu.RUnlock()
+ newStream, err := s.streamerCall(callCtx)
+ if err != nil {
+ return nil, err
+ }
+ for _, msg := range bufferedSends {
+ if err := newStream.SendMsg(msg); err != nil {
+ return nil, err
+ }
+ }
+ if err := newStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return newStream, nil
+}
+
+func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) error {
+ waitTime := time.Duration(0)
+ if attempt > 0 {
+ waitTime = callOpts.backoffFunc(attempt)
+ }
+ if waitTime > 0 {
+ timer := time.NewTimer(waitTime)
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ return contextErrToGrpcErr(ctx.Err())
+ case <-timer.C:
+ }
+ }
+ return nil
+}
+
+// isSafeRetry returns "true", if request is safe for retry with the given error.
+func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool {
+ if isContextError(err) {
+ return false
+ }
+ switch callOpts.retryPolicy {
+ case repeatable:
+ return isSafeRetryImmutableRPC(err)
+ case nonRepeatable:
+ return isSafeRetryMutableRPC(err)
+ default:
+ lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
+ return false
+ }
+}
+
+func isContextError(err error) bool {
+ return grpc.Code(err) == codes.DeadlineExceeded || grpc.Code(err) == codes.Canceled
+}
+
+func contextErrToGrpcErr(err error) error {
+ switch err {
+ case context.DeadlineExceeded:
+ return status.Errorf(codes.DeadlineExceeded, err.Error())
+ case context.Canceled:
+ return status.Errorf(codes.Canceled, err.Error())
+ default:
+ return status.Errorf(codes.Unknown, err.Error())
+ }
+}
+
+var (
+ defaultOptions = &options{
+ retryPolicy: nonRepeatable,
+ max: 0, // disable
+ backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
+ retryAuth: true,
+ }
+)
+
+// backoffFunc denotes a family of functions that control the backoff duration between call retries.
+//
+// They are called with an identifier of the attempt, and should return a time the system client should
+// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
+// the deadline of the request takes precedence and the wait will be interrupted before proceeding
+// with the next iteration.
+type backoffFunc func(attempt uint) time.Duration
+
+// withRetryPolicy sets the retry policy of this call.
+func withRetryPolicy(rp retryPolicy) retryOption {
+ return retryOption{applyFunc: func(o *options) {
+ o.retryPolicy = rp
+ }}
+}
+
+// withMax sets the maximum number of retries on this call, or this interceptor.
+func withMax(maxRetries uint) retryOption {
+ return retryOption{applyFunc: func(o *options) {
+ o.max = maxRetries
+ }}
+}
+
+// WithBackoff sets the `BackoffFunc `used to control time between retries.
+func withBackoff(bf backoffFunc) retryOption {
+ return retryOption{applyFunc: func(o *options) {
+ o.backoffFunc = bf
+ }}
+}
+
+type options struct {
+ retryPolicy retryPolicy
+ max uint
+ backoffFunc backoffFunc
+ retryAuth bool
+}
+
+// retryOption is a grpc.CallOption that is local to clientv3's retry interceptor.
+type retryOption struct {
+ grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
+ applyFunc func(opt *options)
+}
+
+func reuseOrNewWithCallOptions(opt *options, retryOptions []retryOption) *options {
+ if len(retryOptions) == 0 {
+ return opt
+ }
+ optCopy := &options{}
+ *optCopy = *opt
+ for _, f := range retryOptions {
+ f.applyFunc(optCopy)
+ }
+ return optCopy
+}
+
+func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []retryOption) {
+ for _, opt := range callOptions {
+ if co, ok := opt.(retryOption); ok {
+ retryOptions = append(retryOptions, co)
+ } else {
+ grpcOptions = append(grpcOptions, opt)
+ }
+ }
+ return grpcOptions, retryOptions
+}
+
+// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).
+//
+// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.
+func backoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) backoffFunc {
+ return func(attempt uint) time.Duration {
+ return jitterUp(waitBetween, jitterFraction)
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/sort.go b/vendor/go.etcd.io/etcd/clientv3/sort.go
new file mode 100644
index 0000000..2bb9d9a
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/sort.go
@@ -0,0 +1,37 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+type SortTarget int
+type SortOrder int
+
+const (
+ SortNone SortOrder = iota
+ SortAscend
+ SortDescend
+)
+
+const (
+ SortByKey SortTarget = iota
+ SortByVersion
+ SortByCreateRevision
+ SortByModRevision
+ SortByValue
+)
+
+type SortOption struct {
+ Target SortTarget
+ Order SortOrder
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/txn.go b/vendor/go.etcd.io/etcd/clientv3/txn.go
new file mode 100644
index 0000000..c3c2d24
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/txn.go
@@ -0,0 +1,151 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "sync"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+// Txn is the interface that wraps mini-transactions.
+//
+// Txn(context.TODO()).If(
+// Compare(Value(k1), ">", v1),
+// Compare(Version(k1), "=", 2)
+// ).Then(
+// OpPut(k2,v2), OpPut(k3,v3)
+// ).Else(
+// OpPut(k4,v4), OpPut(k5,v5)
+// ).Commit()
+//
+type Txn interface {
+ // If takes a list of comparison. If all comparisons passed in succeed,
+ // the operations passed into Then() will be executed. Or the operations
+ // passed into Else() will be executed.
+ If(cs ...Cmp) Txn
+
+ // Then takes a list of operations. The Ops list will be executed, if the
+ // comparisons passed in If() succeed.
+ Then(ops ...Op) Txn
+
+ // Else takes a list of operations. The Ops list will be executed, if the
+ // comparisons passed in If() fail.
+ Else(ops ...Op) Txn
+
+ // Commit tries to commit the transaction.
+ Commit() (*TxnResponse, error)
+}
+
+type txn struct {
+ kv *kv
+ ctx context.Context
+
+ mu sync.Mutex
+ cif bool
+ cthen bool
+ celse bool
+
+ isWrite bool
+
+ cmps []*pb.Compare
+
+ sus []*pb.RequestOp
+ fas []*pb.RequestOp
+
+ callOpts []grpc.CallOption
+}
+
+func (txn *txn) If(cs ...Cmp) Txn {
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+
+ if txn.cif {
+ panic("cannot call If twice!")
+ }
+
+ if txn.cthen {
+ panic("cannot call If after Then!")
+ }
+
+ if txn.celse {
+ panic("cannot call If after Else!")
+ }
+
+ txn.cif = true
+
+ for i := range cs {
+ txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
+ }
+
+ return txn
+}
+
+func (txn *txn) Then(ops ...Op) Txn {
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+
+ if txn.cthen {
+ panic("cannot call Then twice!")
+ }
+ if txn.celse {
+ panic("cannot call Then after Else!")
+ }
+
+ txn.cthen = true
+
+ for _, op := range ops {
+ txn.isWrite = txn.isWrite || op.isWrite()
+ txn.sus = append(txn.sus, op.toRequestOp())
+ }
+
+ return txn
+}
+
+func (txn *txn) Else(ops ...Op) Txn {
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+
+ if txn.celse {
+ panic("cannot call Else twice!")
+ }
+
+ txn.celse = true
+
+ for _, op := range ops {
+ txn.isWrite = txn.isWrite || op.isWrite()
+ txn.fas = append(txn.fas, op.toRequestOp())
+ }
+
+ return txn
+}
+
+func (txn *txn) Commit() (*TxnResponse, error) {
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+
+ r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
+
+ var resp *pb.TxnResponse
+ var err error
+ resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
+ if err != nil {
+ return nil, toErr(txn.ctx, err)
+ }
+ return (*TxnResponse)(resp), nil
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/utils.go b/vendor/go.etcd.io/etcd/clientv3/utils.go
new file mode 100644
index 0000000..b998c41
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/utils.go
@@ -0,0 +1,49 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "math/rand"
+ "reflect"
+ "runtime"
+ "strings"
+ "time"
+)
+
+// jitterUp adds random jitter to the duration.
+//
+// This adds or subtracts time from the duration within a given jitter fraction.
+// For example for 10s and jitter 0.1, it will return a time within [9s, 11s])
+//
+// Reference: https://godoc.org/github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils
+func jitterUp(duration time.Duration, jitter float64) time.Duration {
+ multiplier := jitter * (rand.Float64()*2 - 1)
+ return time.Duration(float64(duration) * (1 + multiplier))
+}
+
+// Check if the provided function is being called in the op options.
+func isOpFuncCalled(op string, opts []OpOption) bool {
+ for _, opt := range opts {
+ v := reflect.ValueOf(opt)
+ if v.Kind() == reflect.Func {
+ if opFunc := runtime.FuncForPC(v.Pointer()); opFunc != nil {
+ if strings.Contains(opFunc.Name(), op) {
+ return true
+ }
+ }
+ }
+ }
+ return false
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/watch.go b/vendor/go.etcd.io/etcd/clientv3/watch.go
new file mode 100644
index 0000000..4a3b8cc
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/watch.go
@@ -0,0 +1,987 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ EventTypeDelete = mvccpb.DELETE
+ EventTypePut = mvccpb.PUT
+
+ closeSendErrTimeout = 250 * time.Millisecond
+)
+
+type Event mvccpb.Event
+
+type WatchChan <-chan WatchResponse
+
+type Watcher interface {
+ // Watch watches on a key or prefix. The watched events will be returned
+ // through the returned channel. If revisions waiting to be sent over the
+ // watch are compacted, then the watch will be canceled by the server, the
+ // client will post a compacted error watch response, and the channel will close.
+ // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
+ // and "WatchResponse" from this closed channel has zero events and nil "Err()".
+ // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
+ // to release the associated resources.
+ //
+ // If the context is "context.Background/TODO", returned "WatchChan" will
+ // not be closed and block until event is triggered, except when server
+ // returns a non-recoverable error (e.g. ErrCompacted).
+ // For example, when context passed with "WithRequireLeader" and the
+ // connected server has no leader (e.g. due to network partition),
+ // error "etcdserver: no leader" (ErrNoLeader) will be returned,
+ // and then "WatchChan" is closed with non-nil "Err()".
+ // In order to prevent a watch stream being stuck in a partitioned node,
+ // make sure to wrap context with "WithRequireLeader".
+ //
+ // Otherwise, as long as the context has not been canceled or timed out,
+ // watch will retry on other recoverable errors forever until reconnected.
+ //
+ // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
+ // Currently, client contexts are overwritten with "valCtx" that never closes.
+ // TODO(v3.4): configure watch retry policy, limit maximum retry number
+ // (see https://github.com/etcd-io/etcd/issues/8980)
+ Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
+
+ // RequestProgress requests a progress notify response be sent in all watch channels.
+ RequestProgress(ctx context.Context) error
+
+ // Close closes the watcher and cancels all watch requests.
+ Close() error
+}
+
+type WatchResponse struct {
+ Header pb.ResponseHeader
+ Events []*Event
+
+ // CompactRevision is the minimum revision the watcher may receive.
+ CompactRevision int64
+
+ // Canceled is used to indicate watch failure.
+ // If the watch failed and the stream was about to close, before the channel is closed,
+ // the channel sends a final response that has Canceled set to true with a non-nil Err().
+ Canceled bool
+
+ // Created is used to indicate the creation of the watcher.
+ Created bool
+
+ closeErr error
+
+ // cancelReason is a reason of canceling watch
+ cancelReason string
+}
+
+// IsCreate returns true if the event tells that the key is newly created.
+func (e *Event) IsCreate() bool {
+ return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
+}
+
+// IsModify returns true if the event tells that a new value is put on existing key.
+func (e *Event) IsModify() bool {
+ return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
+}
+
+// Err is the error value if this WatchResponse holds an error.
+func (wr *WatchResponse) Err() error {
+ switch {
+ case wr.closeErr != nil:
+ return v3rpc.Error(wr.closeErr)
+ case wr.CompactRevision != 0:
+ return v3rpc.ErrCompacted
+ case wr.Canceled:
+ if len(wr.cancelReason) != 0 {
+ return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
+ }
+ return v3rpc.ErrFutureRev
+ }
+ return nil
+}
+
+// IsProgressNotify returns true if the WatchResponse is progress notification.
+func (wr *WatchResponse) IsProgressNotify() bool {
+ return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
+}
+
+// watcher implements the Watcher interface
+type watcher struct {
+ remote pb.WatchClient
+ callOpts []grpc.CallOption
+
+ // mu protects the grpc streams map
+ mu sync.RWMutex
+
+ // streams holds all the active grpc streams keyed by ctx value.
+ streams map[string]*watchGrpcStream
+}
+
+// watchGrpcStream tracks all watch resources attached to a single grpc stream.
+type watchGrpcStream struct {
+ owner *watcher
+ remote pb.WatchClient
+ callOpts []grpc.CallOption
+
+ // ctx controls internal remote.Watch requests
+ ctx context.Context
+ // ctxKey is the key used when looking up this stream's context
+ ctxKey string
+ cancel context.CancelFunc
+
+ // substreams holds all active watchers on this grpc stream
+ substreams map[int64]*watcherStream
+ // resuming holds all resuming watchers on this grpc stream
+ resuming []*watcherStream
+
+ // reqc sends a watch request from Watch() to the main goroutine
+ reqc chan watchStreamRequest
+ // respc receives data from the watch client
+ respc chan *pb.WatchResponse
+ // donec closes to broadcast shutdown
+ donec chan struct{}
+ // errc transmits errors from grpc Recv to the watch stream reconnect logic
+ errc chan error
+ // closingc gets the watcherStream of closing watchers
+ closingc chan *watcherStream
+ // wg is Done when all substream goroutines have exited
+ wg sync.WaitGroup
+
+ // resumec closes to signal that all substreams should begin resuming
+ resumec chan struct{}
+ // closeErr is the error that closed the watch stream
+ closeErr error
+}
+
+// watchStreamRequest is a union of the supported watch request operation types
+type watchStreamRequest interface {
+ toPB() *pb.WatchRequest
+}
+
+// watchRequest is issued by the subscriber to start a new watcher
+type watchRequest struct {
+ ctx context.Context
+ key string
+ end string
+ rev int64
+
+ // send created notification event if this field is true
+ createdNotify bool
+ // progressNotify is for progress updates
+ progressNotify bool
+ // fragmentation should be disabled by default
+ // if true, split watch events when total exceeds
+ // "--max-request-bytes" flag value + 512-byte
+ fragment bool
+
+ // filters is the list of events to filter out
+ filters []pb.WatchCreateRequest_FilterType
+ // get the previous key-value pair before the event happens
+ prevKV bool
+ // retc receives a chan WatchResponse once the watcher is established
+ retc chan chan WatchResponse
+}
+
+// progressRequest is issued by the subscriber to request watch progress
+type progressRequest struct {
+}
+
+// watcherStream represents a registered watcher
+type watcherStream struct {
+ // initReq is the request that initiated this request
+ initReq watchRequest
+
+ // outc publishes watch responses to subscriber
+ outc chan WatchResponse
+ // recvc buffers watch responses before publishing
+ recvc chan *WatchResponse
+ // donec closes when the watcherStream goroutine stops.
+ donec chan struct{}
+ // closing is set to true when stream should be scheduled to shutdown.
+ closing bool
+ // id is the registered watch id on the grpc stream
+ id int64
+
+ // buf holds all events received from etcd but not yet consumed by the client
+ buf []*WatchResponse
+}
+
+func NewWatcher(c *Client) Watcher {
+ return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
+}
+
+func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
+ w := &watcher{
+ remote: wc,
+ streams: make(map[string]*watchGrpcStream),
+ }
+ if c != nil {
+ w.callOpts = c.callOpts
+ }
+ return w
+}
+
+// never closes
+var valCtxCh = make(chan struct{})
+var zeroTime = time.Unix(0, 0)
+
+// ctx with only the values; never Done
+type valCtx struct{ context.Context }
+
+func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
+func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
+func (vc *valCtx) Err() error { return nil }
+
+func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
+ ctx, cancel := context.WithCancel(&valCtx{inctx})
+ wgs := &watchGrpcStream{
+ owner: w,
+ remote: w.remote,
+ callOpts: w.callOpts,
+ ctx: ctx,
+ ctxKey: streamKeyFromCtx(inctx),
+ cancel: cancel,
+ substreams: make(map[int64]*watcherStream),
+ respc: make(chan *pb.WatchResponse),
+ reqc: make(chan watchStreamRequest),
+ donec: make(chan struct{}),
+ errc: make(chan error, 1),
+ closingc: make(chan *watcherStream),
+ resumec: make(chan struct{}),
+ }
+ go wgs.run()
+ return wgs
+}
+
+// Watch posts a watch request to run() and waits for a new watcher channel
+func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
+ ow := opWatch(key, opts...)
+
+ var filters []pb.WatchCreateRequest_FilterType
+ if ow.filterPut {
+ filters = append(filters, pb.WatchCreateRequest_NOPUT)
+ }
+ if ow.filterDelete {
+ filters = append(filters, pb.WatchCreateRequest_NODELETE)
+ }
+
+ wr := &watchRequest{
+ ctx: ctx,
+ createdNotify: ow.createdNotify,
+ key: string(ow.key),
+ end: string(ow.end),
+ rev: ow.rev,
+ progressNotify: ow.progressNotify,
+ fragment: ow.fragment,
+ filters: filters,
+ prevKV: ow.prevKV,
+ retc: make(chan chan WatchResponse, 1),
+ }
+
+ ok := false
+ ctxKey := streamKeyFromCtx(ctx)
+
+ // find or allocate appropriate grpc watch stream
+ w.mu.Lock()
+ if w.streams == nil {
+ // closed
+ w.mu.Unlock()
+ ch := make(chan WatchResponse)
+ close(ch)
+ return ch
+ }
+ wgs := w.streams[ctxKey]
+ if wgs == nil {
+ wgs = w.newWatcherGrpcStream(ctx)
+ w.streams[ctxKey] = wgs
+ }
+ donec := wgs.donec
+ reqc := wgs.reqc
+ w.mu.Unlock()
+
+ // couldn't create channel; return closed channel
+ closeCh := make(chan WatchResponse, 1)
+
+ // submit request
+ select {
+ case reqc <- wr:
+ ok = true
+ case <-wr.ctx.Done():
+ case <-donec:
+ if wgs.closeErr != nil {
+ closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
+ break
+ }
+ // retry; may have dropped stream from no ctxs
+ return w.Watch(ctx, key, opts...)
+ }
+
+ // receive channel
+ if ok {
+ select {
+ case ret := <-wr.retc:
+ return ret
+ case <-ctx.Done():
+ case <-donec:
+ if wgs.closeErr != nil {
+ closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
+ break
+ }
+ // retry; may have dropped stream from no ctxs
+ return w.Watch(ctx, key, opts...)
+ }
+ }
+
+ close(closeCh)
+ return closeCh
+}
+
+func (w *watcher) Close() (err error) {
+ w.mu.Lock()
+ streams := w.streams
+ w.streams = nil
+ w.mu.Unlock()
+ for _, wgs := range streams {
+ if werr := wgs.close(); werr != nil {
+ err = werr
+ }
+ }
+ // Consider context.Canceled as a successful close
+ if err == context.Canceled {
+ err = nil
+ }
+ return err
+}
+
+// RequestProgress requests a progress notify response be sent in all watch channels.
+func (w *watcher) RequestProgress(ctx context.Context) (err error) {
+ ctxKey := streamKeyFromCtx(ctx)
+
+ w.mu.Lock()
+ if w.streams == nil {
+ w.mu.Unlock()
+ return fmt.Errorf("no stream found for context")
+ }
+ wgs := w.streams[ctxKey]
+ if wgs == nil {
+ wgs = w.newWatcherGrpcStream(ctx)
+ w.streams[ctxKey] = wgs
+ }
+ donec := wgs.donec
+ reqc := wgs.reqc
+ w.mu.Unlock()
+
+ pr := &progressRequest{}
+
+ select {
+ case reqc <- pr:
+ return nil
+ case <-ctx.Done():
+ if err == nil {
+ return ctx.Err()
+ }
+ return err
+ case <-donec:
+ if wgs.closeErr != nil {
+ return wgs.closeErr
+ }
+ // retry; may have dropped stream from no ctxs
+ return w.RequestProgress(ctx)
+ }
+}
+
+func (w *watchGrpcStream) close() (err error) {
+ w.cancel()
+ <-w.donec
+ select {
+ case err = <-w.errc:
+ default:
+ }
+ return toErr(w.ctx, err)
+}
+
+func (w *watcher) closeStream(wgs *watchGrpcStream) {
+ w.mu.Lock()
+ close(wgs.donec)
+ wgs.cancel()
+ if w.streams != nil {
+ delete(w.streams, wgs.ctxKey)
+ }
+ w.mu.Unlock()
+}
+
+func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
+ // check watch ID for backward compatibility (<= v3.3)
+ if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
+ w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
+ // failed; no channel
+ close(ws.recvc)
+ return
+ }
+ ws.id = resp.WatchId
+ w.substreams[ws.id] = ws
+}
+
+func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
+ select {
+ case ws.outc <- *resp:
+ case <-ws.initReq.ctx.Done():
+ case <-time.After(closeSendErrTimeout):
+ }
+ close(ws.outc)
+}
+
+func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
+ // send channel response in case stream was never established
+ select {
+ case ws.initReq.retc <- ws.outc:
+ default:
+ }
+ // close subscriber's channel
+ if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
+ go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
+ } else if ws.outc != nil {
+ close(ws.outc)
+ }
+ if ws.id != -1 {
+ delete(w.substreams, ws.id)
+ return
+ }
+ for i := range w.resuming {
+ if w.resuming[i] == ws {
+ w.resuming[i] = nil
+ return
+ }
+ }
+}
+
+// run is the root of the goroutines for managing a watcher client
+func (w *watchGrpcStream) run() {
+ var wc pb.Watch_WatchClient
+ var closeErr error
+
+ // substreams marked to close but goroutine still running; needed for
+ // avoiding double-closing recvc on grpc stream teardown
+ closing := make(map[*watcherStream]struct{})
+
+ defer func() {
+ w.closeErr = closeErr
+ // shutdown substreams and resuming substreams
+ for _, ws := range w.substreams {
+ if _, ok := closing[ws]; !ok {
+ close(ws.recvc)
+ closing[ws] = struct{}{}
+ }
+ }
+ for _, ws := range w.resuming {
+ if _, ok := closing[ws]; ws != nil && !ok {
+ close(ws.recvc)
+ closing[ws] = struct{}{}
+ }
+ }
+ w.joinSubstreams()
+ for range closing {
+ w.closeSubstream(<-w.closingc)
+ }
+ w.wg.Wait()
+ w.owner.closeStream(w)
+ }()
+
+ // start a stream with the etcd grpc server
+ if wc, closeErr = w.newWatchClient(); closeErr != nil {
+ return
+ }
+
+ cancelSet := make(map[int64]struct{})
+
+ var cur *pb.WatchResponse
+ for {
+ select {
+ // Watch() requested
+ case req := <-w.reqc:
+ switch wreq := req.(type) {
+ case *watchRequest:
+ outc := make(chan WatchResponse, 1)
+ // TODO: pass custom watch ID?
+ ws := &watcherStream{
+ initReq: *wreq,
+ id: -1,
+ outc: outc,
+ // unbuffered so resumes won't cause repeat events
+ recvc: make(chan *WatchResponse),
+ }
+
+ ws.donec = make(chan struct{})
+ w.wg.Add(1)
+ go w.serveSubstream(ws, w.resumec)
+
+ // queue up for watcher creation/resume
+ w.resuming = append(w.resuming, ws)
+ if len(w.resuming) == 1 {
+ // head of resume queue, can register a new watcher
+ wc.Send(ws.initReq.toPB())
+ }
+ case *progressRequest:
+ wc.Send(wreq.toPB())
+ }
+
+ // new events from the watch client
+ case pbresp := <-w.respc:
+ if cur == nil || pbresp.Created || pbresp.Canceled {
+ cur = pbresp
+ } else if cur != nil && cur.WatchId == pbresp.WatchId {
+ // merge new events
+ cur.Events = append(cur.Events, pbresp.Events...)
+ // update "Fragment" field; last response with "Fragment" == false
+ cur.Fragment = pbresp.Fragment
+ }
+
+ switch {
+ case pbresp.Created:
+ // response to head of queue creation
+ if ws := w.resuming[0]; ws != nil {
+ w.addSubstream(pbresp, ws)
+ w.dispatchEvent(pbresp)
+ w.resuming[0] = nil
+ }
+
+ if ws := w.nextResume(); ws != nil {
+ wc.Send(ws.initReq.toPB())
+ }
+
+ // reset for next iteration
+ cur = nil
+
+ case pbresp.Canceled && pbresp.CompactRevision == 0:
+ delete(cancelSet, pbresp.WatchId)
+ if ws, ok := w.substreams[pbresp.WatchId]; ok {
+ // signal to stream goroutine to update closingc
+ close(ws.recvc)
+ closing[ws] = struct{}{}
+ }
+
+ // reset for next iteration
+ cur = nil
+
+ case cur.Fragment:
+ // watch response events are still fragmented
+ // continue to fetch next fragmented event arrival
+ continue
+
+ default:
+ // dispatch to appropriate watch stream
+ ok := w.dispatchEvent(cur)
+
+ // reset for next iteration
+ cur = nil
+
+ if ok {
+ break
+ }
+
+ // watch response on unexpected watch id; cancel id
+ if _, ok := cancelSet[pbresp.WatchId]; ok {
+ break
+ }
+
+ cancelSet[pbresp.WatchId] = struct{}{}
+ cr := &pb.WatchRequest_CancelRequest{
+ CancelRequest: &pb.WatchCancelRequest{
+ WatchId: pbresp.WatchId,
+ },
+ }
+ req := &pb.WatchRequest{RequestUnion: cr}
+ wc.Send(req)
+ }
+
+ // watch client failed on Recv; spawn another if possible
+ case err := <-w.errc:
+ if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
+ closeErr = err
+ return
+ }
+ if wc, closeErr = w.newWatchClient(); closeErr != nil {
+ return
+ }
+ if ws := w.nextResume(); ws != nil {
+ wc.Send(ws.initReq.toPB())
+ }
+ cancelSet = make(map[int64]struct{})
+
+ case <-w.ctx.Done():
+ return
+
+ case ws := <-w.closingc:
+ w.closeSubstream(ws)
+ delete(closing, ws)
+ // no more watchers on this stream, shutdown
+ if len(w.substreams)+len(w.resuming) == 0 {
+ return
+ }
+ }
+ }
+}
+
+// nextResume chooses the next resuming to register with the grpc stream. Abandoned
+// streams are marked as nil in the queue since the head must wait for its inflight registration.
+func (w *watchGrpcStream) nextResume() *watcherStream {
+ for len(w.resuming) != 0 {
+ if w.resuming[0] != nil {
+ return w.resuming[0]
+ }
+ w.resuming = w.resuming[1:len(w.resuming)]
+ }
+ return nil
+}
+
+// dispatchEvent sends a WatchResponse to the appropriate watcher stream
+func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
+ events := make([]*Event, len(pbresp.Events))
+ for i, ev := range pbresp.Events {
+ events[i] = (*Event)(ev)
+ }
+ // TODO: return watch ID?
+ wr := &WatchResponse{
+ Header: *pbresp.Header,
+ Events: events,
+ CompactRevision: pbresp.CompactRevision,
+ Created: pbresp.Created,
+ Canceled: pbresp.Canceled,
+ cancelReason: pbresp.CancelReason,
+ }
+
+ // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
+ // indicate they should be broadcast.
+ if wr.IsProgressNotify() && pbresp.WatchId == -1 {
+ return w.broadcastResponse(wr)
+ }
+
+ return w.unicastResponse(wr, pbresp.WatchId)
+
+}
+
+// broadcastResponse send a watch response to all watch substreams.
+func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
+ for _, ws := range w.substreams {
+ select {
+ case ws.recvc <- wr:
+ case <-ws.donec:
+ }
+ }
+ return true
+}
+
+// unicastResponse sends a watch response to a specific watch substream.
+func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
+ ws, ok := w.substreams[watchId]
+ if !ok {
+ return false
+ }
+ select {
+ case ws.recvc <- wr:
+ case <-ws.donec:
+ return false
+ }
+ return true
+}
+
+// serveWatchClient forwards messages from the grpc stream to run()
+func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
+ for {
+ resp, err := wc.Recv()
+ if err != nil {
+ select {
+ case w.errc <- err:
+ case <-w.donec:
+ }
+ return
+ }
+ select {
+ case w.respc <- resp:
+ case <-w.donec:
+ return
+ }
+ }
+}
+
+// serveSubstream forwards watch responses from run() to the subscriber
+func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
+ if ws.closing {
+ panic("created substream goroutine but substream is closing")
+ }
+
+ // nextRev is the minimum expected next revision
+ nextRev := ws.initReq.rev
+ resuming := false
+ defer func() {
+ if !resuming {
+ ws.closing = true
+ }
+ close(ws.donec)
+ if !resuming {
+ w.closingc <- ws
+ }
+ w.wg.Done()
+ }()
+
+ emptyWr := &WatchResponse{}
+ for {
+ curWr := emptyWr
+ outc := ws.outc
+
+ if len(ws.buf) > 0 {
+ curWr = ws.buf[0]
+ } else {
+ outc = nil
+ }
+ select {
+ case outc <- *curWr:
+ if ws.buf[0].Err() != nil {
+ return
+ }
+ ws.buf[0] = nil
+ ws.buf = ws.buf[1:]
+ case wr, ok := <-ws.recvc:
+ if !ok {
+ // shutdown from closeSubstream
+ return
+ }
+
+ if wr.Created {
+ if ws.initReq.retc != nil {
+ ws.initReq.retc <- ws.outc
+ // to prevent next write from taking the slot in buffered channel
+ // and posting duplicate create events
+ ws.initReq.retc = nil
+
+ // send first creation event only if requested
+ if ws.initReq.createdNotify {
+ ws.outc <- *wr
+ }
+ // once the watch channel is returned, a current revision
+ // watch must resume at the store revision. This is necessary
+ // for the following case to work as expected:
+ // wch := m1.Watch("a")
+ // m2.Put("a", "b")
+ // <-wch
+ // If the revision is only bound on the first observed event,
+ // if wch is disconnected before the Put is issued, then reconnects
+ // after it is committed, it'll miss the Put.
+ if ws.initReq.rev == 0 {
+ nextRev = wr.Header.Revision
+ }
+ }
+ } else {
+ // current progress of watch; <= store revision
+ nextRev = wr.Header.Revision
+ }
+
+ if len(wr.Events) > 0 {
+ nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
+ }
+ ws.initReq.rev = nextRev
+
+ // created event is already sent above,
+ // watcher should not post duplicate events
+ if wr.Created {
+ continue
+ }
+
+ // TODO pause channel if buffer gets too large
+ ws.buf = append(ws.buf, wr)
+ case <-w.ctx.Done():
+ return
+ case <-ws.initReq.ctx.Done():
+ return
+ case <-resumec:
+ resuming = true
+ return
+ }
+ }
+ // lazily send cancel message if events on missing id
+}
+
+func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
+ // mark all substreams as resuming
+ close(w.resumec)
+ w.resumec = make(chan struct{})
+ w.joinSubstreams()
+ for _, ws := range w.substreams {
+ ws.id = -1
+ w.resuming = append(w.resuming, ws)
+ }
+ // strip out nils, if any
+ var resuming []*watcherStream
+ for _, ws := range w.resuming {
+ if ws != nil {
+ resuming = append(resuming, ws)
+ }
+ }
+ w.resuming = resuming
+ w.substreams = make(map[int64]*watcherStream)
+
+ // connect to grpc stream while accepting watcher cancelation
+ stopc := make(chan struct{})
+ donec := w.waitCancelSubstreams(stopc)
+ wc, err := w.openWatchClient()
+ close(stopc)
+ <-donec
+
+ // serve all non-closing streams, even if there's a client error
+ // so that the teardown path can shutdown the streams as expected.
+ for _, ws := range w.resuming {
+ if ws.closing {
+ continue
+ }
+ ws.donec = make(chan struct{})
+ w.wg.Add(1)
+ go w.serveSubstream(ws, w.resumec)
+ }
+
+ if err != nil {
+ return nil, v3rpc.Error(err)
+ }
+
+ // receive data from new grpc stream
+ go w.serveWatchClient(wc)
+ return wc, nil
+}
+
+func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
+ var wg sync.WaitGroup
+ wg.Add(len(w.resuming))
+ donec := make(chan struct{})
+ for i := range w.resuming {
+ go func(ws *watcherStream) {
+ defer wg.Done()
+ if ws.closing {
+ if ws.initReq.ctx.Err() != nil && ws.outc != nil {
+ close(ws.outc)
+ ws.outc = nil
+ }
+ return
+ }
+ select {
+ case <-ws.initReq.ctx.Done():
+ // closed ws will be removed from resuming
+ ws.closing = true
+ close(ws.outc)
+ ws.outc = nil
+ w.wg.Add(1)
+ go func() {
+ defer w.wg.Done()
+ w.closingc <- ws
+ }()
+ case <-stopc:
+ }
+ }(w.resuming[i])
+ }
+ go func() {
+ defer close(donec)
+ wg.Wait()
+ }()
+ return donec
+}
+
+// joinSubstreams waits for all substream goroutines to complete.
+func (w *watchGrpcStream) joinSubstreams() {
+ for _, ws := range w.substreams {
+ <-ws.donec
+ }
+ for _, ws := range w.resuming {
+ if ws != nil {
+ <-ws.donec
+ }
+ }
+}
+
+var maxBackoff = 100 * time.Millisecond
+
+// openWatchClient retries opening a watch client until success or halt.
+// manually retry in case "ws==nil && err==nil"
+// TODO: remove FailFast=false
+func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
+ backoff := time.Millisecond
+ for {
+ select {
+ case <-w.ctx.Done():
+ if err == nil {
+ return nil, w.ctx.Err()
+ }
+ return nil, err
+ default:
+ }
+ if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
+ break
+ }
+ if isHaltErr(w.ctx, err) {
+ return nil, v3rpc.Error(err)
+ }
+ if isUnavailableErr(w.ctx, err) {
+ // retry, but backoff
+ if backoff < maxBackoff {
+ // 25% backoff factor
+ backoff = backoff + backoff/4
+ if backoff > maxBackoff {
+ backoff = maxBackoff
+ }
+ }
+ time.Sleep(backoff)
+ }
+ }
+ return ws, nil
+}
+
+// toPB converts an internal watch request structure to its protobuf WatchRequest structure.
+func (wr *watchRequest) toPB() *pb.WatchRequest {
+ req := &pb.WatchCreateRequest{
+ StartRevision: wr.rev,
+ Key: []byte(wr.key),
+ RangeEnd: []byte(wr.end),
+ ProgressNotify: wr.progressNotify,
+ Filters: wr.filters,
+ PrevKv: wr.prevKV,
+ Fragment: wr.fragment,
+ }
+ cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
+ return &pb.WatchRequest{RequestUnion: cr}
+}
+
+// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
+func (pr *progressRequest) toPB() *pb.WatchRequest {
+ req := &pb.WatchProgressRequest{}
+ cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
+ return &pb.WatchRequest{RequestUnion: cr}
+}
+
+func streamKeyFromCtx(ctx context.Context) string {
+ if md, ok := metadata.FromOutgoingContext(ctx); ok {
+ return fmt.Sprintf("%+v", md)
+ }
+ return ""
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/doc.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/doc.go
new file mode 100644
index 0000000..f72c6a6
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package rpctypes has types and values shared by the etcd server and client for v3 RPC interaction.
+package rpctypes
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go
new file mode 100644
index 0000000..bc1ad7b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go
@@ -0,0 +1,217 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rpctypes
+
+import (
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// server-side error
+var (
+ ErrGRPCEmptyKey = status.New(codes.InvalidArgument, "etcdserver: key is not provided").Err()
+ ErrGRPCKeyNotFound = status.New(codes.InvalidArgument, "etcdserver: key not found").Err()
+ ErrGRPCValueProvided = status.New(codes.InvalidArgument, "etcdserver: value is provided").Err()
+ ErrGRPCLeaseProvided = status.New(codes.InvalidArgument, "etcdserver: lease is provided").Err()
+ ErrGRPCTooManyOps = status.New(codes.InvalidArgument, "etcdserver: too many operations in txn request").Err()
+ ErrGRPCDuplicateKey = status.New(codes.InvalidArgument, "etcdserver: duplicate key given in txn request").Err()
+ ErrGRPCCompacted = status.New(codes.OutOfRange, "etcdserver: mvcc: required revision has been compacted").Err()
+ ErrGRPCFutureRev = status.New(codes.OutOfRange, "etcdserver: mvcc: required revision is a future revision").Err()
+ ErrGRPCNoSpace = status.New(codes.ResourceExhausted, "etcdserver: mvcc: database space exceeded").Err()
+
+ ErrGRPCLeaseNotFound = status.New(codes.NotFound, "etcdserver: requested lease not found").Err()
+ ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
+ ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()
+
+ ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
+ ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
+ ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
+ ErrGRPCMemberBadURLs = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err()
+ ErrGRPCMemberNotFound = status.New(codes.NotFound, "etcdserver: member not found").Err()
+
+ ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err()
+ ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err()
+
+ ErrGRPCRootUserNotExist = status.New(codes.FailedPrecondition, "etcdserver: root user does not exist").Err()
+ ErrGRPCRootRoleNotExist = status.New(codes.FailedPrecondition, "etcdserver: root user does not have root role").Err()
+ ErrGRPCUserAlreadyExist = status.New(codes.FailedPrecondition, "etcdserver: user name already exists").Err()
+ ErrGRPCUserEmpty = status.New(codes.InvalidArgument, "etcdserver: user name is empty").Err()
+ ErrGRPCUserNotFound = status.New(codes.FailedPrecondition, "etcdserver: user name not found").Err()
+ ErrGRPCRoleAlreadyExist = status.New(codes.FailedPrecondition, "etcdserver: role name already exists").Err()
+ ErrGRPCRoleNotFound = status.New(codes.FailedPrecondition, "etcdserver: role name not found").Err()
+ ErrGRPCAuthFailed = status.New(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password").Err()
+ ErrGRPCPermissionDenied = status.New(codes.PermissionDenied, "etcdserver: permission denied").Err()
+ ErrGRPCRoleNotGranted = status.New(codes.FailedPrecondition, "etcdserver: role is not granted to the user").Err()
+ ErrGRPCPermissionNotGranted = status.New(codes.FailedPrecondition, "etcdserver: permission is not granted to the role").Err()
+ ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err()
+ ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err()
+ ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err()
+
+ ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
+ ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
+ ErrGRPCLeaderChanged = status.New(codes.Unavailable, "etcdserver: leader changed").Err()
+ ErrGRPCNotCapable = status.New(codes.Unavailable, "etcdserver: not capable").Err()
+ ErrGRPCStopped = status.New(codes.Unavailable, "etcdserver: server stopped").Err()
+ ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
+ ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err()
+ ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
+ ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
+ ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
+
+ errStringToError = map[string]error{
+ ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
+ ErrorDesc(ErrGRPCKeyNotFound): ErrGRPCKeyNotFound,
+ ErrorDesc(ErrGRPCValueProvided): ErrGRPCValueProvided,
+ ErrorDesc(ErrGRPCLeaseProvided): ErrGRPCLeaseProvided,
+
+ ErrorDesc(ErrGRPCTooManyOps): ErrGRPCTooManyOps,
+ ErrorDesc(ErrGRPCDuplicateKey): ErrGRPCDuplicateKey,
+ ErrorDesc(ErrGRPCCompacted): ErrGRPCCompacted,
+ ErrorDesc(ErrGRPCFutureRev): ErrGRPCFutureRev,
+ ErrorDesc(ErrGRPCNoSpace): ErrGRPCNoSpace,
+
+ ErrorDesc(ErrGRPCLeaseNotFound): ErrGRPCLeaseNotFound,
+ ErrorDesc(ErrGRPCLeaseExist): ErrGRPCLeaseExist,
+ ErrorDesc(ErrGRPCLeaseTTLTooLarge): ErrGRPCLeaseTTLTooLarge,
+
+ ErrorDesc(ErrGRPCMemberExist): ErrGRPCMemberExist,
+ ErrorDesc(ErrGRPCPeerURLExist): ErrGRPCPeerURLExist,
+ ErrorDesc(ErrGRPCMemberNotEnoughStarted): ErrGRPCMemberNotEnoughStarted,
+ ErrorDesc(ErrGRPCMemberBadURLs): ErrGRPCMemberBadURLs,
+ ErrorDesc(ErrGRPCMemberNotFound): ErrGRPCMemberNotFound,
+
+ ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge,
+ ErrorDesc(ErrGRPCRequestTooManyRequests): ErrGRPCRequestTooManyRequests,
+
+ ErrorDesc(ErrGRPCRootUserNotExist): ErrGRPCRootUserNotExist,
+ ErrorDesc(ErrGRPCRootRoleNotExist): ErrGRPCRootRoleNotExist,
+ ErrorDesc(ErrGRPCUserAlreadyExist): ErrGRPCUserAlreadyExist,
+ ErrorDesc(ErrGRPCUserEmpty): ErrGRPCUserEmpty,
+ ErrorDesc(ErrGRPCUserNotFound): ErrGRPCUserNotFound,
+ ErrorDesc(ErrGRPCRoleAlreadyExist): ErrGRPCRoleAlreadyExist,
+ ErrorDesc(ErrGRPCRoleNotFound): ErrGRPCRoleNotFound,
+ ErrorDesc(ErrGRPCAuthFailed): ErrGRPCAuthFailed,
+ ErrorDesc(ErrGRPCPermissionDenied): ErrGRPCPermissionDenied,
+ ErrorDesc(ErrGRPCRoleNotGranted): ErrGRPCRoleNotGranted,
+ ErrorDesc(ErrGRPCPermissionNotGranted): ErrGRPCPermissionNotGranted,
+ ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,
+ ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken,
+ ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt,
+
+ ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
+ ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
+ ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
+ ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
+ ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
+ ErrorDesc(ErrGRPCTimeoutDueToLeaderFail): ErrGRPCTimeoutDueToLeaderFail,
+ ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost,
+ ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy,
+ ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt,
+ }
+)
+
+// client-side error
+var (
+ ErrEmptyKey = Error(ErrGRPCEmptyKey)
+ ErrKeyNotFound = Error(ErrGRPCKeyNotFound)
+ ErrValueProvided = Error(ErrGRPCValueProvided)
+ ErrLeaseProvided = Error(ErrGRPCLeaseProvided)
+ ErrTooManyOps = Error(ErrGRPCTooManyOps)
+ ErrDuplicateKey = Error(ErrGRPCDuplicateKey)
+ ErrCompacted = Error(ErrGRPCCompacted)
+ ErrFutureRev = Error(ErrGRPCFutureRev)
+ ErrNoSpace = Error(ErrGRPCNoSpace)
+
+ ErrLeaseNotFound = Error(ErrGRPCLeaseNotFound)
+ ErrLeaseExist = Error(ErrGRPCLeaseExist)
+ ErrLeaseTTLTooLarge = Error(ErrGRPCLeaseTTLTooLarge)
+
+ ErrMemberExist = Error(ErrGRPCMemberExist)
+ ErrPeerURLExist = Error(ErrGRPCPeerURLExist)
+ ErrMemberNotEnoughStarted = Error(ErrGRPCMemberNotEnoughStarted)
+ ErrMemberBadURLs = Error(ErrGRPCMemberBadURLs)
+ ErrMemberNotFound = Error(ErrGRPCMemberNotFound)
+
+ ErrRequestTooLarge = Error(ErrGRPCRequestTooLarge)
+ ErrTooManyRequests = Error(ErrGRPCRequestTooManyRequests)
+
+ ErrRootUserNotExist = Error(ErrGRPCRootUserNotExist)
+ ErrRootRoleNotExist = Error(ErrGRPCRootRoleNotExist)
+ ErrUserAlreadyExist = Error(ErrGRPCUserAlreadyExist)
+ ErrUserEmpty = Error(ErrGRPCUserEmpty)
+ ErrUserNotFound = Error(ErrGRPCUserNotFound)
+ ErrRoleAlreadyExist = Error(ErrGRPCRoleAlreadyExist)
+ ErrRoleNotFound = Error(ErrGRPCRoleNotFound)
+ ErrAuthFailed = Error(ErrGRPCAuthFailed)
+ ErrPermissionDenied = Error(ErrGRPCPermissionDenied)
+ ErrRoleNotGranted = Error(ErrGRPCRoleNotGranted)
+ ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
+ ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)
+ ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken)
+ ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt)
+
+ ErrNoLeader = Error(ErrGRPCNoLeader)
+ ErrNotLeader = Error(ErrGRPCNotLeader)
+ ErrLeaderChanged = Error(ErrGRPCLeaderChanged)
+ ErrNotCapable = Error(ErrGRPCNotCapable)
+ ErrStopped = Error(ErrGRPCStopped)
+ ErrTimeout = Error(ErrGRPCTimeout)
+ ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
+ ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
+ ErrUnhealthy = Error(ErrGRPCUnhealthy)
+ ErrCorrupt = Error(ErrGRPCCorrupt)
+)
+
+// EtcdError defines gRPC server errors.
+// (https://github.com/grpc/grpc-go/blob/master/rpc_util.go#L319-L323)
+type EtcdError struct {
+ code codes.Code
+ desc string
+}
+
+// Code returns grpc/codes.Code.
+// TODO: define clientv3/codes.Code.
+func (e EtcdError) Code() codes.Code {
+ return e.code
+}
+
+func (e EtcdError) Error() string {
+ return e.desc
+}
+
+func Error(err error) error {
+ if err == nil {
+ return nil
+ }
+ verr, ok := errStringToError[ErrorDesc(err)]
+ if !ok { // not gRPC error
+ return err
+ }
+ ev, ok := status.FromError(verr)
+ var desc string
+ if ok {
+ desc = ev.Message()
+ } else {
+ desc = verr.Error()
+ }
+ return EtcdError{code: ev.Code(), desc: desc}
+}
+
+func ErrorDesc(err error) string {
+ if s, ok := status.FromError(err); ok {
+ return s.Message()
+ }
+ return err.Error()
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go
new file mode 100644
index 0000000..90b8b83
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go
@@ -0,0 +1,22 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rpctypes
+
+var (
+ MetadataRequireLeaderKey = "hasleader"
+ MetadataHasLeader = "true"
+
+ MetadataClientAPIVersionKey = "client-api-version"
+)
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/metadatafields.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/metadatafields.go
new file mode 100644
index 0000000..8f8ac60
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/metadatafields.go
@@ -0,0 +1,20 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rpctypes
+
+var (
+ TokenFieldNameGRPC = "token"
+ TokenFieldNameSwagger = "authorization"
+)