[VOL-1386] This commit add "dep" as the package management tool
for voltha-go.
Change-Id: I52bc4911dd00a441756ec7c30f46d45091f3f90e
diff --git a/vendor/go.etcd.io/etcd/Documentation/README.md b/vendor/go.etcd.io/etcd/Documentation/README.md
new file mode 120000
index 0000000..8828313
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/Documentation/README.md
@@ -0,0 +1 @@
+docs.md
\ No newline at end of file
diff --git a/vendor/go.etcd.io/etcd/LICENSE b/vendor/go.etcd.io/etcd/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/vendor/go.etcd.io/etcd/NOTICE b/vendor/go.etcd.io/etcd/NOTICE
new file mode 100644
index 0000000..b39ddfa
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/NOTICE
@@ -0,0 +1,5 @@
+CoreOS Project
+Copyright 2014 CoreOS, Inc
+
+This product includes software developed at CoreOS, Inc.
+(http://www.coreos.com/).
diff --git a/vendor/go.etcd.io/etcd/clientv3/README.md b/vendor/go.etcd.io/etcd/clientv3/README.md
new file mode 100644
index 0000000..376bfba
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/README.md
@@ -0,0 +1,85 @@
+# etcd/clientv3
+
+[![Godoc](https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://godoc.org/github.com/coreos/etcd/clientv3)
+
+`etcd/clientv3` is the official Go etcd client for v3.
+
+## Install
+
+```bash
+go get github.com/coreos/etcd/clientv3
+```
+
+## Get started
+
+Create client using `clientv3.New`:
+
+```go
+cli, err := clientv3.New(clientv3.Config{
+ Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
+ DialTimeout: 5 * time.Second,
+})
+if err != nil {
+ // handle error!
+}
+defer cli.Close()
+```
+
+etcd v3 uses [`gRPC`](http://www.grpc.io) for remote procedure calls. And `clientv3` uses
+[`grpc-go`](https://github.com/grpc/grpc-go) to connect to etcd. Make sure to close the client after using it.
+If the client is not closed, the connection will have leaky goroutines. To specify client request timeout,
+pass `context.WithTimeout` to APIs:
+
+```go
+ctx, cancel := context.WithTimeout(context.Background(), timeout)
+resp, err := cli.Put(ctx, "sample_key", "sample_value")
+cancel()
+if err != nil {
+ // handle error!
+}
+// use the response
+```
+
+etcd uses `cmd/vendor` directory to store external dependencies, which are
+to be compiled into etcd release binaries. `client` can be imported without
+vendoring. For full compatibility, it is recommended to vendor builds using
+etcd's vendored packages, using tools like godep, as in
+[vendor directories](https://golang.org/cmd/go/#hdr-Vendor_Directories).
+For more detail, please read [Go vendor design](https://golang.org/s/go15vendor).
+
+## Error Handling
+
+etcd client returns 2 types of errors:
+
+1. context error: canceled or deadline exceeded.
+2. gRPC error: see [api/v3rpc/rpctypes](https://godoc.org/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes).
+
+Here is the example code to handle client errors:
+
+```go
+resp, err := cli.Put(ctx, "", "")
+if err != nil {
+ switch err {
+ case context.Canceled:
+ log.Fatalf("ctx is canceled by another routine: %v", err)
+ case context.DeadlineExceeded:
+ log.Fatalf("ctx is attached with a deadline is exceeded: %v", err)
+ case rpctypes.ErrEmptyKey:
+ log.Fatalf("client-side error: %v", err)
+ default:
+ log.Fatalf("bad cluster endpoints, which are not etcd servers: %v", err)
+ }
+}
+```
+
+## Metrics
+
+The etcd client optionally exposes RPC metrics through [go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus). See the [examples](https://github.com/coreos/etcd/blob/master/clientv3/example_metrics_test.go).
+
+## Namespacing
+
+The [namespace](https://godoc.org/github.com/coreos/etcd/clientv3/namespace) package provides `clientv3` interface wrappers to transparently isolate client requests to a user-defined prefix.
+
+## Examples
+
+More code examples can be found at [GoDoc](https://godoc.org/github.com/coreos/etcd/clientv3).
diff --git a/vendor/go.etcd.io/etcd/clientv3/auth.go b/vendor/go.etcd.io/etcd/clientv3/auth.go
new file mode 100644
index 0000000..7545bb6
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/auth.go
@@ -0,0 +1,233 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "fmt"
+ "strings"
+
+ "github.com/coreos/etcd/auth/authpb"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+type (
+ AuthEnableResponse pb.AuthEnableResponse
+ AuthDisableResponse pb.AuthDisableResponse
+ AuthenticateResponse pb.AuthenticateResponse
+ AuthUserAddResponse pb.AuthUserAddResponse
+ AuthUserDeleteResponse pb.AuthUserDeleteResponse
+ AuthUserChangePasswordResponse pb.AuthUserChangePasswordResponse
+ AuthUserGrantRoleResponse pb.AuthUserGrantRoleResponse
+ AuthUserGetResponse pb.AuthUserGetResponse
+ AuthUserRevokeRoleResponse pb.AuthUserRevokeRoleResponse
+ AuthRoleAddResponse pb.AuthRoleAddResponse
+ AuthRoleGrantPermissionResponse pb.AuthRoleGrantPermissionResponse
+ AuthRoleGetResponse pb.AuthRoleGetResponse
+ AuthRoleRevokePermissionResponse pb.AuthRoleRevokePermissionResponse
+ AuthRoleDeleteResponse pb.AuthRoleDeleteResponse
+ AuthUserListResponse pb.AuthUserListResponse
+ AuthRoleListResponse pb.AuthRoleListResponse
+
+ PermissionType authpb.Permission_Type
+ Permission authpb.Permission
+)
+
+const (
+ PermRead = authpb.READ
+ PermWrite = authpb.WRITE
+ PermReadWrite = authpb.READWRITE
+)
+
+type Auth interface {
+ // AuthEnable enables auth of an etcd cluster.
+ AuthEnable(ctx context.Context) (*AuthEnableResponse, error)
+
+ // AuthDisable disables auth of an etcd cluster.
+ AuthDisable(ctx context.Context) (*AuthDisableResponse, error)
+
+ // UserAdd adds a new user to an etcd cluster.
+ UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error)
+
+ // UserDelete deletes a user from an etcd cluster.
+ UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error)
+
+ // UserChangePassword changes a password of a user.
+ UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error)
+
+ // UserGrantRole grants a role to a user.
+ UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error)
+
+ // UserGet gets a detailed information of a user.
+ UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error)
+
+ // UserList gets a list of all users.
+ UserList(ctx context.Context) (*AuthUserListResponse, error)
+
+ // UserRevokeRole revokes a role of a user.
+ UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error)
+
+ // RoleAdd adds a new role to an etcd cluster.
+ RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error)
+
+ // RoleGrantPermission grants a permission to a role.
+ RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error)
+
+ // RoleGet gets a detailed information of a role.
+ RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error)
+
+ // RoleList gets a list of all roles.
+ RoleList(ctx context.Context) (*AuthRoleListResponse, error)
+
+ // RoleRevokePermission revokes a permission from a role.
+ RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error)
+
+ // RoleDelete deletes a role.
+ RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error)
+}
+
+type auth struct {
+ remote pb.AuthClient
+ callOpts []grpc.CallOption
+}
+
+func NewAuth(c *Client) Auth {
+ api := &auth{remote: RetryAuthClient(c)}
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
+ resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
+ return (*AuthEnableResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
+ resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...)
+ return (*AuthDisableResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
+ resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, auth.callOpts...)
+ return (*AuthUserAddResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
+ resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...)
+ return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
+ resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...)
+ return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
+ resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...)
+ return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
+ resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...)
+ return (*AuthUserGetResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
+ resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...)
+ return (*AuthUserListResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
+ resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...)
+ return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
+ resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...)
+ return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) {
+ perm := &authpb.Permission{
+ Key: []byte(key),
+ RangeEnd: []byte(rangeEnd),
+ PermType: authpb.Permission_Type(permType),
+ }
+ resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...)
+ return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
+ resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...)
+ return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
+ resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...)
+ return (*AuthRoleListResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
+ resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, auth.callOpts...)
+ return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
+ resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...)
+ return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
+}
+
+func StrToPermissionType(s string) (PermissionType, error) {
+ val, ok := authpb.Permission_Type_value[strings.ToUpper(s)]
+ if ok {
+ return PermissionType(val), nil
+ }
+ return PermissionType(-1), fmt.Errorf("invalid permission type: %s", s)
+}
+
+type authenticator struct {
+ conn *grpc.ClientConn // conn in-use
+ remote pb.AuthClient
+ callOpts []grpc.CallOption
+}
+
+func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
+ resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
+ return (*AuthenticateResponse)(resp), toErr(ctx, err)
+}
+
+func (auth *authenticator) close() {
+ auth.conn.Close()
+}
+
+func newAuthenticator(endpoint string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
+ conn, err := grpc.Dial(endpoint, opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ api := &authenticator{
+ conn: conn,
+ remote: pb.NewAuthClient(conn),
+ }
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api, nil
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/client.go b/vendor/go.etcd.io/etcd/clientv3/client.go
new file mode 100644
index 0000000..7132807
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/client.go
@@ -0,0 +1,576 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "crypto/tls"
+ "errors"
+ "fmt"
+ "net"
+ "net/url"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+var (
+ ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
+ ErrOldCluster = errors.New("etcdclient: old cluster version")
+)
+
+// Client provides and manages an etcd v3 client session.
+type Client struct {
+ Cluster
+ KV
+ Lease
+ Watcher
+ Auth
+ Maintenance
+
+ conn *grpc.ClientConn
+ dialerrc chan error
+
+ cfg Config
+ creds *credentials.TransportCredentials
+ balancer *healthBalancer
+ mu *sync.Mutex
+
+ ctx context.Context
+ cancel context.CancelFunc
+
+ // Username is a user name for authentication.
+ Username string
+ // Password is a password for authentication.
+ Password string
+ // tokenCred is an instance of WithPerRPCCredentials()'s argument
+ tokenCred *authTokenCredential
+
+ callOpts []grpc.CallOption
+}
+
+// New creates a new etcdv3 client from a given configuration.
+func New(cfg Config) (*Client, error) {
+ if len(cfg.Endpoints) == 0 {
+ return nil, ErrNoAvailableEndpoints
+ }
+
+ return newClient(&cfg)
+}
+
+// NewCtxClient creates a client with a context but no underlying grpc
+// connection. This is useful for embedded cases that override the
+// service interface implementations and do not need connection management.
+func NewCtxClient(ctx context.Context) *Client {
+ cctx, cancel := context.WithCancel(ctx)
+ return &Client{ctx: cctx, cancel: cancel}
+}
+
+// NewFromURL creates a new etcdv3 client from a URL.
+func NewFromURL(url string) (*Client, error) {
+ return New(Config{Endpoints: []string{url}})
+}
+
+// Close shuts down the client's etcd connections.
+func (c *Client) Close() error {
+ c.cancel()
+ c.Watcher.Close()
+ c.Lease.Close()
+ if c.conn != nil {
+ return toErr(c.ctx, c.conn.Close())
+ }
+ return c.ctx.Err()
+}
+
+// Ctx is a context for "out of band" messages (e.g., for sending
+// "clean up" message when another context is canceled). It is
+// canceled on client Close().
+func (c *Client) Ctx() context.Context { return c.ctx }
+
+// Endpoints lists the registered endpoints for the client.
+func (c *Client) Endpoints() (eps []string) {
+ // copy the slice; protect original endpoints from being changed
+ eps = make([]string, len(c.cfg.Endpoints))
+ copy(eps, c.cfg.Endpoints)
+ return
+}
+
+// SetEndpoints updates client's endpoints.
+func (c *Client) SetEndpoints(eps ...string) {
+ c.mu.Lock()
+ c.cfg.Endpoints = eps
+ c.mu.Unlock()
+ c.balancer.updateAddrs(eps...)
+
+ // updating notifyCh can trigger new connections,
+ // need update addrs if all connections are down
+ // or addrs does not include pinAddr.
+ c.balancer.mu.RLock()
+ update := !hasAddr(c.balancer.addrs, c.balancer.pinAddr)
+ c.balancer.mu.RUnlock()
+ if update {
+ select {
+ case c.balancer.updateAddrsC <- notifyNext:
+ case <-c.balancer.stopc:
+ }
+ }
+}
+
+// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
+func (c *Client) Sync(ctx context.Context) error {
+ mresp, err := c.MemberList(ctx)
+ if err != nil {
+ return err
+ }
+ var eps []string
+ for _, m := range mresp.Members {
+ eps = append(eps, m.ClientURLs...)
+ }
+ c.SetEndpoints(eps...)
+ return nil
+}
+
+func (c *Client) autoSync() {
+ if c.cfg.AutoSyncInterval == time.Duration(0) {
+ return
+ }
+
+ for {
+ select {
+ case <-c.ctx.Done():
+ return
+ case <-time.After(c.cfg.AutoSyncInterval):
+ ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
+ err := c.Sync(ctx)
+ cancel()
+ if err != nil && err != c.ctx.Err() {
+ logger.Println("Auto sync endpoints failed:", err)
+ }
+ }
+ }
+}
+
+type authTokenCredential struct {
+ token string
+ tokenMu *sync.RWMutex
+}
+
+func (cred authTokenCredential) RequireTransportSecurity() bool {
+ return false
+}
+
+func (cred authTokenCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
+ cred.tokenMu.RLock()
+ defer cred.tokenMu.RUnlock()
+ return map[string]string{
+ "token": cred.token,
+ }, nil
+}
+
+func parseEndpoint(endpoint string) (proto string, host string, scheme string) {
+ proto = "tcp"
+ host = endpoint
+ url, uerr := url.Parse(endpoint)
+ if uerr != nil || !strings.Contains(endpoint, "://") {
+ return proto, host, scheme
+ }
+ scheme = url.Scheme
+
+ // strip scheme:// prefix since grpc dials by host
+ host = url.Host
+ switch url.Scheme {
+ case "http", "https":
+ case "unix", "unixs":
+ proto = "unix"
+ host = url.Host + url.Path
+ default:
+ proto, host = "", ""
+ }
+ return proto, host, scheme
+}
+
+func (c *Client) processCreds(scheme string) (creds *credentials.TransportCredentials) {
+ creds = c.creds
+ switch scheme {
+ case "unix":
+ case "http":
+ creds = nil
+ case "https", "unixs":
+ if creds != nil {
+ break
+ }
+ tlsconfig := &tls.Config{}
+ emptyCreds := credentials.NewTLS(tlsconfig)
+ creds = &emptyCreds
+ default:
+ creds = nil
+ }
+ return creds
+}
+
+// dialSetupOpts gives the dial opts prior to any authentication
+func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts []grpc.DialOption) {
+ if c.cfg.DialTimeout > 0 {
+ opts = []grpc.DialOption{grpc.WithTimeout(c.cfg.DialTimeout)}
+ }
+ if c.cfg.DialKeepAliveTime > 0 {
+ params := keepalive.ClientParameters{
+ Time: c.cfg.DialKeepAliveTime,
+ Timeout: c.cfg.DialKeepAliveTimeout,
+ }
+ opts = append(opts, grpc.WithKeepaliveParams(params))
+ }
+ opts = append(opts, dopts...)
+
+ f := func(host string, t time.Duration) (net.Conn, error) {
+ proto, host, _ := parseEndpoint(c.balancer.endpoint(host))
+ if host == "" && endpoint != "" {
+ // dialing an endpoint not in the balancer; use
+ // endpoint passed into dial
+ proto, host, _ = parseEndpoint(endpoint)
+ }
+ if proto == "" {
+ return nil, fmt.Errorf("unknown scheme for %q", host)
+ }
+ select {
+ case <-c.ctx.Done():
+ return nil, c.ctx.Err()
+ default:
+ }
+ dialer := &net.Dialer{Timeout: t}
+ conn, err := dialer.DialContext(c.ctx, proto, host)
+ if err != nil {
+ select {
+ case c.dialerrc <- err:
+ default:
+ }
+ }
+ return conn, err
+ }
+ opts = append(opts, grpc.WithDialer(f))
+
+ creds := c.creds
+ if _, _, scheme := parseEndpoint(endpoint); len(scheme) != 0 {
+ creds = c.processCreds(scheme)
+ }
+ if creds != nil {
+ opts = append(opts, grpc.WithTransportCredentials(*creds))
+ } else {
+ opts = append(opts, grpc.WithInsecure())
+ }
+
+ return opts
+}
+
+// Dial connects to a single endpoint using the client's config.
+func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
+ return c.dial(endpoint)
+}
+
+func (c *Client) getToken(ctx context.Context) error {
+ var err error // return last error in a case of fail
+ var auth *authenticator
+
+ for i := 0; i < len(c.cfg.Endpoints); i++ {
+ endpoint := c.cfg.Endpoints[i]
+ host := getHost(endpoint)
+ // use dial options without dopts to avoid reusing the client balancer
+ auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint), c)
+ if err != nil {
+ continue
+ }
+ defer auth.close()
+
+ var resp *AuthenticateResponse
+ resp, err = auth.authenticate(ctx, c.Username, c.Password)
+ if err != nil {
+ continue
+ }
+
+ c.tokenCred.tokenMu.Lock()
+ c.tokenCred.token = resp.Token
+ c.tokenCred.tokenMu.Unlock()
+
+ return nil
+ }
+
+ return err
+}
+
+func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
+ opts := c.dialSetupOpts(endpoint, dopts...)
+ host := getHost(endpoint)
+ if c.Username != "" && c.Password != "" {
+ c.tokenCred = &authTokenCredential{
+ tokenMu: &sync.RWMutex{},
+ }
+
+ ctx := c.ctx
+ if c.cfg.DialTimeout > 0 {
+ cctx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
+ defer cancel()
+ ctx = cctx
+ }
+
+ err := c.getToken(ctx)
+ if err != nil {
+ if toErr(ctx, err) != rpctypes.ErrAuthNotEnabled {
+ if err == ctx.Err() && ctx.Err() != c.ctx.Err() {
+ err = context.DeadlineExceeded
+ }
+ return nil, err
+ }
+ } else {
+ opts = append(opts, grpc.WithPerRPCCredentials(c.tokenCred))
+ }
+ }
+
+ opts = append(opts, c.cfg.DialOptions...)
+
+ conn, err := grpc.DialContext(c.ctx, host, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return conn, nil
+}
+
+// WithRequireLeader requires client requests to only succeed
+// when the cluster has a leader.
+func WithRequireLeader(ctx context.Context) context.Context {
+ md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
+ return metadata.NewOutgoingContext(ctx, md)
+}
+
+func newClient(cfg *Config) (*Client, error) {
+ if cfg == nil {
+ cfg = &Config{}
+ }
+ var creds *credentials.TransportCredentials
+ if cfg.TLS != nil {
+ c := credentials.NewTLS(cfg.TLS)
+ creds = &c
+ }
+
+ // use a temporary skeleton client to bootstrap first connection
+ baseCtx := context.TODO()
+ if cfg.Context != nil {
+ baseCtx = cfg.Context
+ }
+
+ ctx, cancel := context.WithCancel(baseCtx)
+ client := &Client{
+ conn: nil,
+ dialerrc: make(chan error, 1),
+ cfg: *cfg,
+ creds: creds,
+ ctx: ctx,
+ cancel: cancel,
+ mu: new(sync.Mutex),
+ callOpts: defaultCallOpts,
+ }
+ if cfg.Username != "" && cfg.Password != "" {
+ client.Username = cfg.Username
+ client.Password = cfg.Password
+ }
+ if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 {
+ if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize {
+ return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize)
+ }
+ callOpts := []grpc.CallOption{
+ defaultFailFast,
+ defaultMaxCallSendMsgSize,
+ defaultMaxCallRecvMsgSize,
+ }
+ if cfg.MaxCallSendMsgSize > 0 {
+ callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize)
+ }
+ if cfg.MaxCallRecvMsgSize > 0 {
+ callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize)
+ }
+ client.callOpts = callOpts
+ }
+
+ client.balancer = newHealthBalancer(cfg.Endpoints, cfg.DialTimeout, func(ep string) (bool, error) {
+ return grpcHealthCheck(client, ep)
+ })
+
+ // use Endpoints[0] so that for https:// without any tls config given, then
+ // grpc will assume the certificate server name is the endpoint host.
+ conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
+ if err != nil {
+ client.cancel()
+ client.balancer.Close()
+ return nil, err
+ }
+ client.conn = conn
+
+ // wait for a connection
+ if cfg.DialTimeout > 0 {
+ hasConn := false
+ waitc := time.After(cfg.DialTimeout)
+ select {
+ case <-client.balancer.ready():
+ hasConn = true
+ case <-ctx.Done():
+ case <-waitc:
+ }
+ if !hasConn {
+ err := context.DeadlineExceeded
+ select {
+ case err = <-client.dialerrc:
+ default:
+ }
+ client.cancel()
+ client.balancer.Close()
+ conn.Close()
+ return nil, err
+ }
+ }
+
+ client.Cluster = NewCluster(client)
+ client.KV = NewKV(client)
+ client.Lease = NewLease(client)
+ client.Watcher = NewWatcher(client)
+ client.Auth = NewAuth(client)
+ client.Maintenance = NewMaintenance(client)
+
+ if cfg.RejectOldCluster {
+ if err := client.checkVersion(); err != nil {
+ client.Close()
+ return nil, err
+ }
+ }
+
+ go client.autoSync()
+ return client, nil
+}
+
+func (c *Client) checkVersion() (err error) {
+ var wg sync.WaitGroup
+ errc := make(chan error, len(c.cfg.Endpoints))
+ ctx, cancel := context.WithCancel(c.ctx)
+ if c.cfg.DialTimeout > 0 {
+ ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
+ }
+ wg.Add(len(c.cfg.Endpoints))
+ for _, ep := range c.cfg.Endpoints {
+ // if cluster is current, any endpoint gives a recent version
+ go func(e string) {
+ defer wg.Done()
+ resp, rerr := c.Status(ctx, e)
+ if rerr != nil {
+ errc <- rerr
+ return
+ }
+ vs := strings.Split(resp.Version, ".")
+ maj, min := 0, 0
+ if len(vs) >= 2 {
+ maj, _ = strconv.Atoi(vs[0])
+ min, rerr = strconv.Atoi(vs[1])
+ }
+ if maj < 3 || (maj == 3 && min < 2) {
+ rerr = ErrOldCluster
+ }
+ errc <- rerr
+ }(ep)
+ }
+ // wait for success
+ for i := 0; i < len(c.cfg.Endpoints); i++ {
+ if err = <-errc; err == nil {
+ break
+ }
+ }
+ cancel()
+ wg.Wait()
+ return err
+}
+
+// ActiveConnection returns the current in-use connection
+func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
+
+// isHaltErr returns true if the given error and context indicate no forward
+// progress can be made, even after reconnecting.
+func isHaltErr(ctx context.Context, err error) bool {
+ if ctx != nil && ctx.Err() != nil {
+ return true
+ }
+ if err == nil {
+ return false
+ }
+ ev, _ := status.FromError(err)
+ // Unavailable codes mean the system will be right back.
+ // (e.g., can't connect, lost leader)
+ // Treat Internal codes as if something failed, leaving the
+ // system in an inconsistent state, but retrying could make progress.
+ // (e.g., failed in middle of send, corrupted frame)
+ // TODO: are permanent Internal errors possible from grpc?
+ return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
+}
+
+// isUnavailableErr returns true if the given error is an unavailable error
+func isUnavailableErr(ctx context.Context, err error) bool {
+ if ctx != nil && ctx.Err() != nil {
+ return false
+ }
+ if err == nil {
+ return false
+ }
+ ev, _ := status.FromError(err)
+ // Unavailable codes mean the system will be right back.
+ // (e.g., can't connect, lost leader)
+ return ev.Code() == codes.Unavailable
+}
+
+func toErr(ctx context.Context, err error) error {
+ if err == nil {
+ return nil
+ }
+ err = rpctypes.Error(err)
+ if _, ok := err.(rpctypes.EtcdError); ok {
+ return err
+ }
+ ev, _ := status.FromError(err)
+ code := ev.Code()
+ switch code {
+ case codes.DeadlineExceeded:
+ fallthrough
+ case codes.Canceled:
+ if ctx.Err() != nil {
+ err = ctx.Err()
+ }
+ case codes.Unavailable:
+ case codes.FailedPrecondition:
+ err = grpc.ErrClientConnClosing
+ }
+ return err
+}
+
+func canceledByCaller(stopCtx context.Context, err error) bool {
+ if stopCtx.Err() == nil || err == nil {
+ return false
+ }
+
+ return err == context.Canceled || err == context.DeadlineExceeded
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/cluster.go b/vendor/go.etcd.io/etcd/clientv3/cluster.go
new file mode 100644
index 0000000..785672b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/cluster.go
@@ -0,0 +1,114 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "github.com/coreos/etcd/pkg/types"
+
+ "google.golang.org/grpc"
+)
+
+type (
+ Member pb.Member
+ MemberListResponse pb.MemberListResponse
+ MemberAddResponse pb.MemberAddResponse
+ MemberRemoveResponse pb.MemberRemoveResponse
+ MemberUpdateResponse pb.MemberUpdateResponse
+)
+
+type Cluster interface {
+ // MemberList lists the current cluster membership.
+ MemberList(ctx context.Context) (*MemberListResponse, error)
+
+ // MemberAdd adds a new member into the cluster.
+ MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
+
+ // MemberRemove removes an existing member from the cluster.
+ MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)
+
+ // MemberUpdate updates the peer addresses of the member.
+ MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error)
+}
+
+type cluster struct {
+ remote pb.ClusterClient
+ callOpts []grpc.CallOption
+}
+
+func NewCluster(c *Client) Cluster {
+ api := &cluster{remote: RetryClusterClient(c)}
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
+ api := &cluster{remote: remote}
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
+ // fail-fast before panic in rafthttp
+ if _, err := types.NewURLs(peerAddrs); err != nil {
+ return nil, err
+ }
+
+ r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
+ resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*MemberAddResponse)(resp), nil
+}
+
+func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) {
+ r := &pb.MemberRemoveRequest{ID: id}
+ resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*MemberRemoveResponse)(resp), nil
+}
+
+func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) {
+ // fail-fast before panic in rafthttp
+ if _, err := types.NewURLs(peerAddrs); err != nil {
+ return nil, err
+ }
+
+ // it is safe to retry on update.
+ r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs}
+ resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...)
+ if err == nil {
+ return (*MemberUpdateResponse)(resp), nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
+ // it is safe to retry on list.
+ resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, c.callOpts...)
+ if err == nil {
+ return (*MemberListResponse)(resp), nil
+ }
+ return nil, toErr(ctx, err)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/compact_op.go b/vendor/go.etcd.io/etcd/clientv3/compact_op.go
new file mode 100644
index 0000000..41e80c1
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/compact_op.go
@@ -0,0 +1,51 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+// CompactOp represents a compact operation.
+type CompactOp struct {
+ revision int64
+ physical bool
+}
+
+// CompactOption configures compact operation.
+type CompactOption func(*CompactOp)
+
+func (op *CompactOp) applyCompactOpts(opts []CompactOption) {
+ for _, opt := range opts {
+ opt(op)
+ }
+}
+
+// OpCompact wraps slice CompactOption to create a CompactOp.
+func OpCompact(rev int64, opts ...CompactOption) CompactOp {
+ ret := CompactOp{revision: rev}
+ ret.applyCompactOpts(opts)
+ return ret
+}
+
+func (op CompactOp) toRequest() *pb.CompactionRequest {
+ return &pb.CompactionRequest{Revision: op.revision, Physical: op.physical}
+}
+
+// WithCompactPhysical makes Compact wait until all compacted entries are
+// removed from the etcd server's storage.
+func WithCompactPhysical() CompactOption {
+ return func(op *CompactOp) { op.physical = true }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/compare.go b/vendor/go.etcd.io/etcd/clientv3/compare.go
new file mode 100644
index 0000000..b5f0a25
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/compare.go
@@ -0,0 +1,140 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type CompareTarget int
+type CompareResult int
+
+const (
+ CompareVersion CompareTarget = iota
+ CompareCreated
+ CompareModified
+ CompareValue
+)
+
+type Cmp pb.Compare
+
+func Compare(cmp Cmp, result string, v interface{}) Cmp {
+ var r pb.Compare_CompareResult
+
+ switch result {
+ case "=":
+ r = pb.Compare_EQUAL
+ case "!=":
+ r = pb.Compare_NOT_EQUAL
+ case ">":
+ r = pb.Compare_GREATER
+ case "<":
+ r = pb.Compare_LESS
+ default:
+ panic("Unknown result op")
+ }
+
+ cmp.Result = r
+ switch cmp.Target {
+ case pb.Compare_VALUE:
+ val, ok := v.(string)
+ if !ok {
+ panic("bad compare value")
+ }
+ cmp.TargetUnion = &pb.Compare_Value{Value: []byte(val)}
+ case pb.Compare_VERSION:
+ cmp.TargetUnion = &pb.Compare_Version{Version: mustInt64(v)}
+ case pb.Compare_CREATE:
+ cmp.TargetUnion = &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)}
+ case pb.Compare_MOD:
+ cmp.TargetUnion = &pb.Compare_ModRevision{ModRevision: mustInt64(v)}
+ case pb.Compare_LEASE:
+ cmp.TargetUnion = &pb.Compare_Lease{Lease: mustInt64orLeaseID(v)}
+ default:
+ panic("Unknown compare type")
+ }
+ return cmp
+}
+
+func Value(key string) Cmp {
+ return Cmp{Key: []byte(key), Target: pb.Compare_VALUE}
+}
+
+func Version(key string) Cmp {
+ return Cmp{Key: []byte(key), Target: pb.Compare_VERSION}
+}
+
+func CreateRevision(key string) Cmp {
+ return Cmp{Key: []byte(key), Target: pb.Compare_CREATE}
+}
+
+func ModRevision(key string) Cmp {
+ return Cmp{Key: []byte(key), Target: pb.Compare_MOD}
+}
+
+// LeaseValue compares a key's LeaseID to a value of your choosing. The empty
+// LeaseID is 0, otherwise known as `NoLease`.
+func LeaseValue(key string) Cmp {
+ return Cmp{Key: []byte(key), Target: pb.Compare_LEASE}
+}
+
+// KeyBytes returns the byte slice holding with the comparison key.
+func (cmp *Cmp) KeyBytes() []byte { return cmp.Key }
+
+// WithKeyBytes sets the byte slice for the comparison key.
+func (cmp *Cmp) WithKeyBytes(key []byte) { cmp.Key = key }
+
+// ValueBytes returns the byte slice holding the comparison value, if any.
+func (cmp *Cmp) ValueBytes() []byte {
+ if tu, ok := cmp.TargetUnion.(*pb.Compare_Value); ok {
+ return tu.Value
+ }
+ return nil
+}
+
+// WithValueBytes sets the byte slice for the comparison's value.
+func (cmp *Cmp) WithValueBytes(v []byte) { cmp.TargetUnion.(*pb.Compare_Value).Value = v }
+
+// WithRange sets the comparison to scan the range [key, end).
+func (cmp Cmp) WithRange(end string) Cmp {
+ cmp.RangeEnd = []byte(end)
+ return cmp
+}
+
+// WithPrefix sets the comparison to scan all keys prefixed by the key.
+func (cmp Cmp) WithPrefix() Cmp {
+ cmp.RangeEnd = getPrefix(cmp.Key)
+ return cmp
+}
+
+// mustInt64 panics if val isn't an int or int64. It returns an int64 otherwise.
+func mustInt64(val interface{}) int64 {
+ if v, ok := val.(int64); ok {
+ return v
+ }
+ if v, ok := val.(int); ok {
+ return int64(v)
+ }
+ panic("bad value")
+}
+
+// mustInt64orLeaseID panics if val isn't a LeaseID, int or int64. It returns an
+// int64 otherwise.
+func mustInt64orLeaseID(val interface{}) int64 {
+ if v, ok := val.(LeaseID); ok {
+ return int64(v)
+ }
+ return mustInt64(val)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/config.go b/vendor/go.etcd.io/etcd/clientv3/config.go
new file mode 100644
index 0000000..79d6e2a
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/config.go
@@ -0,0 +1,75 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "crypto/tls"
+ "time"
+
+ "google.golang.org/grpc"
+)
+
+type Config struct {
+ // Endpoints is a list of URLs.
+ Endpoints []string `json:"endpoints"`
+
+ // AutoSyncInterval is the interval to update endpoints with its latest members.
+ // 0 disables auto-sync. By default auto-sync is disabled.
+ AutoSyncInterval time.Duration `json:"auto-sync-interval"`
+
+ // DialTimeout is the timeout for failing to establish a connection.
+ DialTimeout time.Duration `json:"dial-timeout"`
+
+ // DialKeepAliveTime is the time after which client pings the server to see if
+ // transport is alive.
+ DialKeepAliveTime time.Duration `json:"dial-keep-alive-time"`
+
+ // DialKeepAliveTimeout is the time that the client waits for a response for the
+ // keep-alive probe. If the response is not received in this time, the connection is closed.
+ DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"`
+
+ // MaxCallSendMsgSize is the client-side request send limit in bytes.
+ // If 0, it defaults to 2.0 MiB (2 * 1024 * 1024).
+ // Make sure that "MaxCallSendMsgSize" < server-side default send/recv limit.
+ // ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
+ MaxCallSendMsgSize int
+
+ // MaxCallRecvMsgSize is the client-side response receive limit.
+ // If 0, it defaults to "math.MaxInt32", because range response can
+ // easily exceed request send limits.
+ // Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.
+ // ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes").
+ MaxCallRecvMsgSize int
+
+ // TLS holds the client secure credentials, if any.
+ TLS *tls.Config
+
+ // Username is a user name for authentication.
+ Username string `json:"username"`
+
+ // Password is a password for authentication.
+ Password string `json:"password"`
+
+ // RejectOldCluster when set will refuse to create a client against an outdated cluster.
+ RejectOldCluster bool `json:"reject-old-cluster"`
+
+ // DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
+ DialOptions []grpc.DialOption
+
+ // Context is the default client context; it can be used to cancel grpc dial out and
+ // other operations that do not have an explicit context.
+ Context context.Context
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/doc.go b/vendor/go.etcd.io/etcd/clientv3/doc.go
new file mode 100644
index 0000000..717fbe4
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/doc.go
@@ -0,0 +1,97 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package clientv3 implements the official Go etcd client for v3.
+//
+// Create client using `clientv3.New`:
+//
+// // expect dial time-out on ipv4 blackhole
+// _, err := clientv3.New(clientv3.Config{
+// Endpoints: []string{"http://254.0.0.1:12345"},
+// DialTimeout: 2 * time.Second
+// })
+//
+// // etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3
+// if err == context.DeadlineExceeded {
+// // handle errors
+// }
+//
+// // etcd clientv3 <= v3.2.9, grpc/grpc-go <= v1.2.1
+// if err == grpc.ErrClientConnTimeout {
+// // handle errors
+// }
+//
+// cli, err := clientv3.New(clientv3.Config{
+// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
+// DialTimeout: 5 * time.Second,
+// })
+// if err != nil {
+// // handle error!
+// }
+// defer cli.Close()
+//
+// Make sure to close the client after using it. If the client is not closed, the
+// connection will have leaky goroutines.
+//
+// To specify a client request timeout, wrap the context with context.WithTimeout:
+//
+// ctx, cancel := context.WithTimeout(context.Background(), timeout)
+// resp, err := kvc.Put(ctx, "sample_key", "sample_value")
+// cancel()
+// if err != nil {
+// // handle error!
+// }
+// // use the response
+//
+// The Client has internal state (watchers and leases), so Clients should be reused instead of created as needed.
+// Clients are safe for concurrent use by multiple goroutines.
+//
+// etcd client returns 3 types of errors:
+//
+// 1. context error: canceled or deadline exceeded.
+// 2. gRPC status error: e.g. when clock drifts in server-side before client's context deadline exceeded.
+// 3. gRPC error: see https://github.com/coreos/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
+//
+// Here is the example code to handle client errors:
+//
+// resp, err := kvc.Put(ctx, "", "")
+// if err != nil {
+// if err == context.Canceled {
+// // ctx is canceled by another routine
+// } else if err == context.DeadlineExceeded {
+// // ctx is attached with a deadline and it exceeded
+// } else if ev, ok := status.FromError(err); ok {
+// code := ev.Code()
+// if code == codes.DeadlineExceeded {
+// // server-side context might have timed-out first (due to clock skew)
+// // while original client-side context is not timed-out yet
+// }
+// } else if verr, ok := err.(*v3rpc.ErrEmptyKey); ok {
+// // process (verr.Errors)
+// } else {
+// // bad cluster endpoints, which are not etcd servers
+// }
+// }
+//
+// go func() { cli.Close() }()
+// _, err := kvc.Get(ctx, "a")
+// if err != nil {
+// if err == context.Canceled {
+// // grpc balancer calls 'Get' with an inflight client.Close
+// } else if err == grpc.ErrClientConnClosing {
+// // grpc balancer calls 'Get' after client.Close.
+// }
+// }
+//
+package clientv3
diff --git a/vendor/go.etcd.io/etcd/clientv3/health_balancer.go b/vendor/go.etcd.io/etcd/clientv3/health_balancer.go
new file mode 100644
index 0000000..5918cba
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/health_balancer.go
@@ -0,0 +1,609 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "errors"
+ "net/url"
+ "strings"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ minHealthRetryDuration = 3 * time.Second
+ unknownService = "unknown service grpc.health.v1.Health"
+)
+
+// ErrNoAddrAvilable is returned by Get() when the balancer does not have
+// any active connection to endpoints at the time.
+// This error is returned only when opts.BlockingWait is true.
+var ErrNoAddrAvilable = status.Error(codes.Unavailable, "there is no address available")
+
+type healthCheckFunc func(ep string) (bool, error)
+
+type notifyMsg int
+
+const (
+ notifyReset notifyMsg = iota
+ notifyNext
+)
+
+// healthBalancer does the bare minimum to expose multiple eps
+// to the grpc reconnection code path
+type healthBalancer struct {
+ // addrs are the client's endpoint addresses for grpc
+ addrs []grpc.Address
+
+ // eps holds the raw endpoints from the client
+ eps []string
+
+ // notifyCh notifies grpc of the set of addresses for connecting
+ notifyCh chan []grpc.Address
+
+ // readyc closes once the first connection is up
+ readyc chan struct{}
+ readyOnce sync.Once
+
+ // healthCheck checks an endpoint's health.
+ healthCheck healthCheckFunc
+ healthCheckTimeout time.Duration
+
+ unhealthyMu sync.RWMutex
+ unhealthyHostPorts map[string]time.Time
+
+ // mu protects all fields below.
+ mu sync.RWMutex
+
+ // upc closes when pinAddr transitions from empty to non-empty or the balancer closes.
+ upc chan struct{}
+
+ // downc closes when grpc calls down() on pinAddr
+ downc chan struct{}
+
+ // stopc is closed to signal updateNotifyLoop should stop.
+ stopc chan struct{}
+ stopOnce sync.Once
+ wg sync.WaitGroup
+
+ // donec closes when all goroutines are exited
+ donec chan struct{}
+
+ // updateAddrsC notifies updateNotifyLoop to update addrs.
+ updateAddrsC chan notifyMsg
+
+ // grpc issues TLS cert checks using the string passed into dial so
+ // that string must be the host. To recover the full scheme://host URL,
+ // have a map from hosts to the original endpoint.
+ hostPort2ep map[string]string
+
+ // pinAddr is the currently pinned address; set to the empty string on
+ // initialization and shutdown.
+ pinAddr string
+
+ closed bool
+}
+
+func newHealthBalancer(eps []string, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
+ notifyCh := make(chan []grpc.Address)
+ addrs := eps2addrs(eps)
+ hb := &healthBalancer{
+ addrs: addrs,
+ eps: eps,
+ notifyCh: notifyCh,
+ readyc: make(chan struct{}),
+ healthCheck: hc,
+ unhealthyHostPorts: make(map[string]time.Time),
+ upc: make(chan struct{}),
+ stopc: make(chan struct{}),
+ downc: make(chan struct{}),
+ donec: make(chan struct{}),
+ updateAddrsC: make(chan notifyMsg),
+ hostPort2ep: getHostPort2ep(eps),
+ }
+ if timeout < minHealthRetryDuration {
+ timeout = minHealthRetryDuration
+ }
+ hb.healthCheckTimeout = timeout
+
+ close(hb.downc)
+ go hb.updateNotifyLoop()
+ hb.wg.Add(1)
+ go func() {
+ defer hb.wg.Done()
+ hb.updateUnhealthy()
+ }()
+ return hb
+}
+
+func (b *healthBalancer) Start(target string, config grpc.BalancerConfig) error { return nil }
+
+func (b *healthBalancer) ConnectNotify() <-chan struct{} {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ return b.upc
+}
+
+func (b *healthBalancer) ready() <-chan struct{} { return b.readyc }
+
+func (b *healthBalancer) endpoint(hostPort string) string {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+ return b.hostPort2ep[hostPort]
+}
+
+func (b *healthBalancer) pinned() string {
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+ return b.pinAddr
+}
+
+func (b *healthBalancer) hostPortError(hostPort string, err error) {
+ if b.endpoint(hostPort) == "" {
+ logger.Lvl(4).Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error())
+ return
+ }
+
+ b.unhealthyMu.Lock()
+ b.unhealthyHostPorts[hostPort] = time.Now()
+ b.unhealthyMu.Unlock()
+ logger.Lvl(4).Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error())
+}
+
+func (b *healthBalancer) removeUnhealthy(hostPort, msg string) {
+ if b.endpoint(hostPort) == "" {
+ logger.Lvl(4).Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg)
+ return
+ }
+
+ b.unhealthyMu.Lock()
+ delete(b.unhealthyHostPorts, hostPort)
+ b.unhealthyMu.Unlock()
+ logger.Lvl(4).Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg)
+}
+
+func (b *healthBalancer) countUnhealthy() (count int) {
+ b.unhealthyMu.RLock()
+ count = len(b.unhealthyHostPorts)
+ b.unhealthyMu.RUnlock()
+ return count
+}
+
+func (b *healthBalancer) isUnhealthy(hostPort string) (unhealthy bool) {
+ b.unhealthyMu.RLock()
+ _, unhealthy = b.unhealthyHostPorts[hostPort]
+ b.unhealthyMu.RUnlock()
+ return unhealthy
+}
+
+func (b *healthBalancer) cleanupUnhealthy() {
+ b.unhealthyMu.Lock()
+ for k, v := range b.unhealthyHostPorts {
+ if time.Since(v) > b.healthCheckTimeout {
+ delete(b.unhealthyHostPorts, k)
+ logger.Lvl(4).Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout)
+ }
+ }
+ b.unhealthyMu.Unlock()
+}
+
+func (b *healthBalancer) liveAddrs() ([]grpc.Address, map[string]struct{}) {
+ unhealthyCnt := b.countUnhealthy()
+
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+
+ hbAddrs := b.addrs
+ if len(b.addrs) == 1 || unhealthyCnt == 0 || unhealthyCnt == len(b.addrs) {
+ liveHostPorts := make(map[string]struct{}, len(b.hostPort2ep))
+ for k := range b.hostPort2ep {
+ liveHostPorts[k] = struct{}{}
+ }
+ return hbAddrs, liveHostPorts
+ }
+
+ addrs := make([]grpc.Address, 0, len(b.addrs)-unhealthyCnt)
+ liveHostPorts := make(map[string]struct{}, len(addrs))
+ for _, addr := range b.addrs {
+ if !b.isUnhealthy(addr.Addr) {
+ addrs = append(addrs, addr)
+ liveHostPorts[addr.Addr] = struct{}{}
+ }
+ }
+ return addrs, liveHostPorts
+}
+
+func (b *healthBalancer) updateUnhealthy() {
+ for {
+ select {
+ case <-time.After(b.healthCheckTimeout):
+ b.cleanupUnhealthy()
+ pinned := b.pinned()
+ if pinned == "" || b.isUnhealthy(pinned) {
+ select {
+ case b.updateAddrsC <- notifyNext:
+ case <-b.stopc:
+ return
+ }
+ }
+ case <-b.stopc:
+ return
+ }
+ }
+}
+
+func (b *healthBalancer) updateAddrs(eps ...string) {
+ np := getHostPort2ep(eps)
+
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ match := len(np) == len(b.hostPort2ep)
+ if match {
+ for k, v := range np {
+ if b.hostPort2ep[k] != v {
+ match = false
+ break
+ }
+ }
+ }
+ if match {
+ // same endpoints, so no need to update address
+ return
+ }
+
+ b.hostPort2ep = np
+ b.addrs, b.eps = eps2addrs(eps), eps
+
+ b.unhealthyMu.Lock()
+ b.unhealthyHostPorts = make(map[string]time.Time)
+ b.unhealthyMu.Unlock()
+}
+
+func (b *healthBalancer) next() {
+ b.mu.RLock()
+ downc := b.downc
+ b.mu.RUnlock()
+ select {
+ case b.updateAddrsC <- notifyNext:
+ case <-b.stopc:
+ }
+ // wait until disconnect so new RPCs are not issued on old connection
+ select {
+ case <-downc:
+ case <-b.stopc:
+ }
+}
+
+func (b *healthBalancer) updateNotifyLoop() {
+ defer close(b.donec)
+
+ for {
+ b.mu.RLock()
+ upc, downc, addr := b.upc, b.downc, b.pinAddr
+ b.mu.RUnlock()
+ // downc or upc should be closed
+ select {
+ case <-downc:
+ downc = nil
+ default:
+ }
+ select {
+ case <-upc:
+ upc = nil
+ default:
+ }
+ switch {
+ case downc == nil && upc == nil:
+ // stale
+ select {
+ case <-b.stopc:
+ return
+ default:
+ }
+ case downc == nil:
+ b.notifyAddrs(notifyReset)
+ select {
+ case <-upc:
+ case msg := <-b.updateAddrsC:
+ b.notifyAddrs(msg)
+ case <-b.stopc:
+ return
+ }
+ case upc == nil:
+ select {
+ // close connections that are not the pinned address
+ case b.notifyCh <- []grpc.Address{{Addr: addr}}:
+ case <-downc:
+ case <-b.stopc:
+ return
+ }
+ select {
+ case <-downc:
+ b.notifyAddrs(notifyReset)
+ case msg := <-b.updateAddrsC:
+ b.notifyAddrs(msg)
+ case <-b.stopc:
+ return
+ }
+ }
+ }
+}
+
+func (b *healthBalancer) notifyAddrs(msg notifyMsg) {
+ if msg == notifyNext {
+ select {
+ case b.notifyCh <- []grpc.Address{}:
+ case <-b.stopc:
+ return
+ }
+ }
+ b.mu.RLock()
+ pinAddr := b.pinAddr
+ downc := b.downc
+ b.mu.RUnlock()
+ addrs, hostPorts := b.liveAddrs()
+
+ var waitDown bool
+ if pinAddr != "" {
+ _, ok := hostPorts[pinAddr]
+ waitDown = !ok
+ }
+
+ select {
+ case b.notifyCh <- addrs:
+ if waitDown {
+ select {
+ case <-downc:
+ case <-b.stopc:
+ }
+ }
+ case <-b.stopc:
+ }
+}
+
+func (b *healthBalancer) Up(addr grpc.Address) func(error) {
+ if !b.mayPin(addr) {
+ return func(err error) {}
+ }
+
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ // gRPC might call Up after it called Close. We add this check
+ // to "fix" it up at application layer. Otherwise, will panic
+ // if b.upc is already closed.
+ if b.closed {
+ return func(err error) {}
+ }
+
+ // gRPC might call Up on a stale address.
+ // Prevent updating pinAddr with a stale address.
+ if !hasAddr(b.addrs, addr.Addr) {
+ return func(err error) {}
+ }
+
+ if b.pinAddr != "" {
+ logger.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
+ return func(err error) {}
+ }
+
+ // notify waiting Get()s and pin first connected address
+ close(b.upc)
+ b.downc = make(chan struct{})
+ b.pinAddr = addr.Addr
+ logger.Lvl(4).Infof("clientv3/balancer: pin %q", addr.Addr)
+
+ // notify client that a connection is up
+ b.readyOnce.Do(func() { close(b.readyc) })
+
+ return func(err error) {
+ // If connected to a black hole endpoint or a killed server, the gRPC ping
+ // timeout will induce a network I/O error, and retrying until success;
+ // finding healthy endpoint on retry could take several timeouts and redials.
+ // To avoid wasting retries, gray-list unhealthy endpoints.
+ b.hostPortError(addr.Addr, err)
+
+ b.mu.Lock()
+ b.upc = make(chan struct{})
+ close(b.downc)
+ b.pinAddr = ""
+ b.mu.Unlock()
+ logger.Lvl(4).Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
+ }
+}
+
+func (b *healthBalancer) mayPin(addr grpc.Address) bool {
+ if b.endpoint(addr.Addr) == "" { // stale host:port
+ return false
+ }
+
+ b.unhealthyMu.RLock()
+ unhealthyCnt := len(b.unhealthyHostPorts)
+ failedTime, bad := b.unhealthyHostPorts[addr.Addr]
+ b.unhealthyMu.RUnlock()
+
+ b.mu.RLock()
+ skip := len(b.addrs) == 1 || unhealthyCnt == 0 || len(b.addrs) == unhealthyCnt
+ b.mu.RUnlock()
+ if skip || !bad {
+ return true
+ }
+
+ // prevent isolated member's endpoint from being infinitely retried, as follows:
+ // 1. keepalive pings detects GoAway with http2.ErrCodeEnhanceYourCalm
+ // 2. balancer 'Up' unpins with grpc: failed with network I/O error
+ // 3. grpc-healthcheck still SERVING, thus retry to pin
+ // instead, return before grpc-healthcheck if failed within healthcheck timeout
+ if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout {
+ logger.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout)
+ return false
+ }
+
+ if ok, _ := b.healthCheck(addr.Addr); ok {
+ b.removeUnhealthy(addr.Addr, "health check success")
+ return true
+ }
+
+ b.hostPortError(addr.Addr, errors.New("health check failed"))
+ return false
+}
+
+func (b *healthBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
+ var (
+ addr string
+ closed bool
+ )
+
+ // If opts.BlockingWait is false (for fail-fast RPCs), it should return
+ // an address it has notified via Notify immediately instead of blocking.
+ if !opts.BlockingWait {
+ b.mu.RLock()
+ closed = b.closed
+ addr = b.pinAddr
+ b.mu.RUnlock()
+ if closed {
+ return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
+ }
+ if addr == "" {
+ return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable
+ }
+ return grpc.Address{Addr: addr}, func() {}, nil
+ }
+
+ for {
+ b.mu.RLock()
+ ch := b.upc
+ b.mu.RUnlock()
+ select {
+ case <-ch:
+ case <-b.donec:
+ return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
+ case <-ctx.Done():
+ return grpc.Address{Addr: ""}, nil, ctx.Err()
+ }
+ b.mu.RLock()
+ closed = b.closed
+ addr = b.pinAddr
+ b.mu.RUnlock()
+ // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
+ if closed {
+ return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
+ }
+ if addr != "" {
+ break
+ }
+ }
+ return grpc.Address{Addr: addr}, func() {}, nil
+}
+
+func (b *healthBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
+
+func (b *healthBalancer) Close() error {
+ b.mu.Lock()
+ // In case gRPC calls close twice. TODO: remove the checking
+ // when we are sure that gRPC wont call close twice.
+ if b.closed {
+ b.mu.Unlock()
+ <-b.donec
+ return nil
+ }
+ b.closed = true
+ b.stopOnce.Do(func() { close(b.stopc) })
+ b.pinAddr = ""
+
+ // In the case of following scenario:
+ // 1. upc is not closed; no pinned address
+ // 2. client issues an RPC, calling invoke(), which calls Get(), enters for loop, blocks
+ // 3. client.conn.Close() calls balancer.Close(); closed = true
+ // 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled
+ // we must close upc so Get() exits from blocking on upc
+ select {
+ case <-b.upc:
+ default:
+ // terminate all waiting Get()s
+ close(b.upc)
+ }
+
+ b.mu.Unlock()
+ b.wg.Wait()
+
+ // wait for updateNotifyLoop to finish
+ <-b.donec
+ close(b.notifyCh)
+
+ return nil
+}
+
+func grpcHealthCheck(client *Client, ep string) (bool, error) {
+ conn, err := client.dial(ep)
+ if err != nil {
+ return false, err
+ }
+ defer conn.Close()
+ cli := healthpb.NewHealthClient(conn)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
+ cancel()
+ if err != nil {
+ if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
+ if s.Message() == unknownService { // etcd < v3.3.0
+ return true, nil
+ }
+ }
+ return false, err
+ }
+ return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
+}
+
+func hasAddr(addrs []grpc.Address, targetAddr string) bool {
+ for _, addr := range addrs {
+ if targetAddr == addr.Addr {
+ return true
+ }
+ }
+ return false
+}
+
+func getHost(ep string) string {
+ url, uerr := url.Parse(ep)
+ if uerr != nil || !strings.Contains(ep, "://") {
+ return ep
+ }
+ return url.Host
+}
+
+func eps2addrs(eps []string) []grpc.Address {
+ addrs := make([]grpc.Address, len(eps))
+ for i := range eps {
+ addrs[i].Addr = getHost(eps[i])
+ }
+ return addrs
+}
+
+func getHostPort2ep(eps []string) map[string]string {
+ hm := make(map[string]string, len(eps))
+ for i := range eps {
+ _, host, _ := parseEndpoint(eps[i])
+ hm[host] = eps[i]
+ }
+ return hm
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/kv.go b/vendor/go.etcd.io/etcd/clientv3/kv.go
new file mode 100644
index 0000000..5a7469b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/kv.go
@@ -0,0 +1,177 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+type (
+ CompactResponse pb.CompactionResponse
+ PutResponse pb.PutResponse
+ GetResponse pb.RangeResponse
+ DeleteResponse pb.DeleteRangeResponse
+ TxnResponse pb.TxnResponse
+)
+
+type KV interface {
+ // Put puts a key-value pair into etcd.
+ // Note that key,value can be plain bytes array and string is
+ // an immutable representation of that bytes array.
+ // To get a string of bytes, do string([]byte{0x10, 0x20}).
+ Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
+
+ // Get retrieves keys.
+ // By default, Get will return the value for "key", if any.
+ // When passed WithRange(end), Get will return the keys in the range [key, end).
+ // When passed WithFromKey(), Get returns keys greater than or equal to key.
+ // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
+ // if the required revision is compacted, the request will fail with ErrCompacted .
+ // When passed WithLimit(limit), the number of returned keys is bounded by limit.
+ // When passed WithSort(), the keys will be sorted.
+ Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
+
+ // Delete deletes a key, or optionally using WithRange(end), [key, end).
+ Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
+
+ // Compact compacts etcd KV history before the given rev.
+ Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
+
+ // Do applies a single Op on KV without a transaction.
+ // Do is useful when creating arbitrary operations to be issued at a
+ // later time; the user can range over the operations, calling Do to
+ // execute them. Get/Put/Delete, on the other hand, are best suited
+ // for when the operation should be issued at the time of declaration.
+ Do(ctx context.Context, op Op) (OpResponse, error)
+
+ // Txn creates a transaction.
+ Txn(ctx context.Context) Txn
+}
+
+type OpResponse struct {
+ put *PutResponse
+ get *GetResponse
+ del *DeleteResponse
+ txn *TxnResponse
+}
+
+func (op OpResponse) Put() *PutResponse { return op.put }
+func (op OpResponse) Get() *GetResponse { return op.get }
+func (op OpResponse) Del() *DeleteResponse { return op.del }
+func (op OpResponse) Txn() *TxnResponse { return op.txn }
+
+func (resp *PutResponse) OpResponse() OpResponse {
+ return OpResponse{put: resp}
+}
+func (resp *GetResponse) OpResponse() OpResponse {
+ return OpResponse{get: resp}
+}
+func (resp *DeleteResponse) OpResponse() OpResponse {
+ return OpResponse{del: resp}
+}
+func (resp *TxnResponse) OpResponse() OpResponse {
+ return OpResponse{txn: resp}
+}
+
+type kv struct {
+ remote pb.KVClient
+ callOpts []grpc.CallOption
+}
+
+func NewKV(c *Client) KV {
+ api := &kv{remote: RetryKVClient(c)}
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {
+ api := &kv{remote: remote}
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
+ r, err := kv.Do(ctx, OpPut(key, val, opts...))
+ return r.put, toErr(ctx, err)
+}
+
+func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
+ r, err := kv.Do(ctx, OpGet(key, opts...))
+ return r.get, toErr(ctx, err)
+}
+
+func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
+ r, err := kv.Do(ctx, OpDelete(key, opts...))
+ return r.del, toErr(ctx, err)
+}
+
+func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
+ resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*CompactResponse)(resp), err
+}
+
+func (kv *kv) Txn(ctx context.Context) Txn {
+ return &txn{
+ kv: kv,
+ ctx: ctx,
+ callOpts: kv.callOpts,
+ }
+}
+
+func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
+ var err error
+ switch op.t {
+ case tRange:
+ var resp *pb.RangeResponse
+ resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
+ if err == nil {
+ return OpResponse{get: (*GetResponse)(resp)}, nil
+ }
+ case tPut:
+ var resp *pb.PutResponse
+ r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
+ resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
+ if err == nil {
+ return OpResponse{put: (*PutResponse)(resp)}, nil
+ }
+ case tDeleteRange:
+ var resp *pb.DeleteRangeResponse
+ r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
+ resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
+ if err == nil {
+ return OpResponse{del: (*DeleteResponse)(resp)}, nil
+ }
+ case tTxn:
+ var resp *pb.TxnResponse
+ resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
+ if err == nil {
+ return OpResponse{txn: (*TxnResponse)(resp)}, nil
+ }
+ default:
+ panic("Unknown op")
+ }
+ return OpResponse{}, toErr(ctx, err)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/lease.go b/vendor/go.etcd.io/etcd/clientv3/lease.go
new file mode 100644
index 0000000..3729cf3
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/lease.go
@@ -0,0 +1,588 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+type (
+ LeaseRevokeResponse pb.LeaseRevokeResponse
+ LeaseID int64
+)
+
+// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
+type LeaseGrantResponse struct {
+ *pb.ResponseHeader
+ ID LeaseID
+ TTL int64
+ Error string
+}
+
+// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
+type LeaseKeepAliveResponse struct {
+ *pb.ResponseHeader
+ ID LeaseID
+ TTL int64
+}
+
+// LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
+type LeaseTimeToLiveResponse struct {
+ *pb.ResponseHeader
+ ID LeaseID `json:"id"`
+
+ // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1.
+ TTL int64 `json:"ttl"`
+
+ // GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
+ GrantedTTL int64 `json:"granted-ttl"`
+
+ // Keys is the list of keys attached to this lease.
+ Keys [][]byte `json:"keys"`
+}
+
+// LeaseStatus represents a lease status.
+type LeaseStatus struct {
+ ID LeaseID `json:"id"`
+ // TODO: TTL int64
+}
+
+// LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
+type LeaseLeasesResponse struct {
+ *pb.ResponseHeader
+ Leases []LeaseStatus `json:"leases"`
+}
+
+const (
+ // defaultTTL is the assumed lease TTL used for the first keepalive
+ // deadline before the actual TTL is known to the client.
+ defaultTTL = 5 * time.Second
+ // NoLease is a lease ID for the absence of a lease.
+ NoLease LeaseID = 0
+
+ // retryConnWait is how long to wait before retrying request due to an error
+ retryConnWait = 500 * time.Millisecond
+)
+
+// LeaseResponseChSize is the size of buffer to store unsent lease responses.
+// WARNING: DO NOT UPDATE.
+// Only for testing purposes.
+var LeaseResponseChSize = 16
+
+// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
+//
+// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
+type ErrKeepAliveHalted struct {
+ Reason error
+}
+
+func (e ErrKeepAliveHalted) Error() string {
+ s := "etcdclient: leases keep alive halted"
+ if e.Reason != nil {
+ s += ": " + e.Reason.Error()
+ }
+ return s
+}
+
+type Lease interface {
+ // Grant creates a new lease.
+ Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
+
+ // Revoke revokes the given lease.
+ Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
+
+ // TimeToLive retrieves the lease information of the given lease ID.
+ TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
+
+ // Leases retrieves all leases.
+ Leases(ctx context.Context) (*LeaseLeasesResponse, error)
+
+ // KeepAlive keeps the given lease alive forever. If the keepalive response
+ // posted to the channel is not consumed immediately, the lease client will
+ // continue sending keep alive requests to the etcd server at least every
+ // second until latest response is consumed.
+ //
+ // The returned "LeaseKeepAliveResponse" channel closes if underlying keep
+ // alive stream is interrupted in some way the client cannot handle itself;
+ // given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
+ // from this closed channel is nil.
+ //
+ // If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
+ // no leader") or canceled by the caller (e.g. context.Canceled), the error
+ // is returned. Otherwise, it retries.
+ //
+ // TODO(v4.0): post errors to last keep alive message before closing
+ // (see https://github.com/coreos/etcd/pull/7866)
+ KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
+
+ // KeepAliveOnce renews the lease once. The response corresponds to the
+ // first message from calling KeepAlive. If the response has a recoverable
+ // error, KeepAliveOnce will retry the RPC with a new keep alive message.
+ //
+ // In most of the cases, Keepalive should be used instead of KeepAliveOnce.
+ KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
+
+ // Close releases all resources Lease keeps for efficient communication
+ // with the etcd server.
+ Close() error
+}
+
+type lessor struct {
+ mu sync.Mutex // guards all fields
+
+ // donec is closed and loopErr is set when recvKeepAliveLoop stops
+ donec chan struct{}
+ loopErr error
+
+ remote pb.LeaseClient
+
+ stream pb.Lease_LeaseKeepAliveClient
+ streamCancel context.CancelFunc
+
+ stopCtx context.Context
+ stopCancel context.CancelFunc
+
+ keepAlives map[LeaseID]*keepAlive
+
+ // firstKeepAliveTimeout is the timeout for the first keepalive request
+ // before the actual TTL is known to the lease client
+ firstKeepAliveTimeout time.Duration
+
+ // firstKeepAliveOnce ensures stream starts after first KeepAlive call.
+ firstKeepAliveOnce sync.Once
+
+ callOpts []grpc.CallOption
+}
+
+// keepAlive multiplexes a keepalive for a lease over multiple channels
+type keepAlive struct {
+ chs []chan<- *LeaseKeepAliveResponse
+ ctxs []context.Context
+ // deadline is the time the keep alive channels close if no response
+ deadline time.Time
+ // nextKeepAlive is when to send the next keep alive message
+ nextKeepAlive time.Time
+ // donec is closed on lease revoke, expiration, or cancel.
+ donec chan struct{}
+}
+
+func NewLease(c *Client) Lease {
+ return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
+}
+
+func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
+ l := &lessor{
+ donec: make(chan struct{}),
+ keepAlives: make(map[LeaseID]*keepAlive),
+ remote: remote,
+ firstKeepAliveTimeout: keepAliveTimeout,
+ }
+ if l.firstKeepAliveTimeout == time.Second {
+ l.firstKeepAliveTimeout = defaultTTL
+ }
+ if c != nil {
+ l.callOpts = c.callOpts
+ }
+ reqLeaderCtx := WithRequireLeader(context.Background())
+ l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
+ return l
+}
+
+func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
+ r := &pb.LeaseGrantRequest{TTL: ttl}
+ resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
+ if err == nil {
+ gresp := &LeaseGrantResponse{
+ ResponseHeader: resp.GetHeader(),
+ ID: LeaseID(resp.ID),
+ TTL: resp.TTL,
+ Error: resp.Error,
+ }
+ return gresp, nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
+ r := &pb.LeaseRevokeRequest{ID: int64(id)}
+ resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
+ if err == nil {
+ return (*LeaseRevokeResponse)(resp), nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
+ r := toLeaseTimeToLiveRequest(id, opts...)
+ resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
+ if err == nil {
+ gresp := &LeaseTimeToLiveResponse{
+ ResponseHeader: resp.GetHeader(),
+ ID: LeaseID(resp.ID),
+ TTL: resp.TTL,
+ GrantedTTL: resp.GrantedTTL,
+ Keys: resp.Keys,
+ }
+ return gresp, nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
+ resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
+ if err == nil {
+ leases := make([]LeaseStatus, len(resp.Leases))
+ for i := range resp.Leases {
+ leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
+ }
+ return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
+ ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
+
+ l.mu.Lock()
+ // ensure that recvKeepAliveLoop is still running
+ select {
+ case <-l.donec:
+ err := l.loopErr
+ l.mu.Unlock()
+ close(ch)
+ return ch, ErrKeepAliveHalted{Reason: err}
+ default:
+ }
+ ka, ok := l.keepAlives[id]
+ if !ok {
+ // create fresh keep alive
+ ka = &keepAlive{
+ chs: []chan<- *LeaseKeepAliveResponse{ch},
+ ctxs: []context.Context{ctx},
+ deadline: time.Now().Add(l.firstKeepAliveTimeout),
+ nextKeepAlive: time.Now(),
+ donec: make(chan struct{}),
+ }
+ l.keepAlives[id] = ka
+ } else {
+ // add channel and context to existing keep alive
+ ka.ctxs = append(ka.ctxs, ctx)
+ ka.chs = append(ka.chs, ch)
+ }
+ l.mu.Unlock()
+
+ go l.keepAliveCtxCloser(id, ctx, ka.donec)
+ l.firstKeepAliveOnce.Do(func() {
+ go l.recvKeepAliveLoop()
+ go l.deadlineLoop()
+ })
+
+ return ch, nil
+}
+
+func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
+ for {
+ resp, err := l.keepAliveOnce(ctx, id)
+ if err == nil {
+ if resp.TTL <= 0 {
+ err = rpctypes.ErrLeaseNotFound
+ }
+ return resp, err
+ }
+ if isHaltErr(ctx, err) {
+ return nil, toErr(ctx, err)
+ }
+ }
+}
+
+func (l *lessor) Close() error {
+ l.stopCancel()
+ // close for synchronous teardown if stream goroutines never launched
+ l.firstKeepAliveOnce.Do(func() { close(l.donec) })
+ <-l.donec
+ return nil
+}
+
+func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
+ select {
+ case <-donec:
+ return
+ case <-l.donec:
+ return
+ case <-ctx.Done():
+ }
+
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ ka, ok := l.keepAlives[id]
+ if !ok {
+ return
+ }
+
+ // close channel and remove context if still associated with keep alive
+ for i, c := range ka.ctxs {
+ if c == ctx {
+ close(ka.chs[i])
+ ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
+ ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
+ break
+ }
+ }
+ // remove if no one more listeners
+ if len(ka.chs) == 0 {
+ delete(l.keepAlives, id)
+ }
+}
+
+// closeRequireLeader scans keepAlives for ctxs that have require leader
+// and closes the associated channels.
+func (l *lessor) closeRequireLeader() {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ for _, ka := range l.keepAlives {
+ reqIdxs := 0
+ // find all required leader channels, close, mark as nil
+ for i, ctx := range ka.ctxs {
+ md, ok := metadata.FromOutgoingContext(ctx)
+ if !ok {
+ continue
+ }
+ ks := md[rpctypes.MetadataRequireLeaderKey]
+ if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
+ continue
+ }
+ close(ka.chs[i])
+ ka.chs[i] = nil
+ reqIdxs++
+ }
+ if reqIdxs == 0 {
+ continue
+ }
+ // remove all channels that required a leader from keepalive
+ newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
+ newCtxs := make([]context.Context, len(newChs))
+ newIdx := 0
+ for i := range ka.chs {
+ if ka.chs[i] == nil {
+ continue
+ }
+ newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
+ newIdx++
+ }
+ ka.chs, ka.ctxs = newChs, newCtxs
+ }
+}
+
+func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
+ cctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+
+ err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+
+ resp, rerr := stream.Recv()
+ if rerr != nil {
+ return nil, toErr(ctx, rerr)
+ }
+
+ karesp := &LeaseKeepAliveResponse{
+ ResponseHeader: resp.GetHeader(),
+ ID: LeaseID(resp.ID),
+ TTL: resp.TTL,
+ }
+ return karesp, nil
+}
+
+func (l *lessor) recvKeepAliveLoop() (gerr error) {
+ defer func() {
+ l.mu.Lock()
+ close(l.donec)
+ l.loopErr = gerr
+ for _, ka := range l.keepAlives {
+ ka.close()
+ }
+ l.keepAlives = make(map[LeaseID]*keepAlive)
+ l.mu.Unlock()
+ }()
+
+ for {
+ stream, err := l.resetRecv()
+ if err != nil {
+ if canceledByCaller(l.stopCtx, err) {
+ return err
+ }
+ } else {
+ for {
+ resp, err := stream.Recv()
+ if err != nil {
+ if canceledByCaller(l.stopCtx, err) {
+ return err
+ }
+
+ if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
+ l.closeRequireLeader()
+ }
+ break
+ }
+
+ l.recvKeepAlive(resp)
+ }
+ }
+
+ select {
+ case <-time.After(retryConnWait):
+ continue
+ case <-l.stopCtx.Done():
+ return l.stopCtx.Err()
+ }
+ }
+}
+
+// resetRecv opens a new lease stream and starts sending keep alive requests.
+func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
+ sctx, cancel := context.WithCancel(l.stopCtx)
+ stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
+ if err != nil {
+ cancel()
+ return nil, err
+ }
+
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ if l.stream != nil && l.streamCancel != nil {
+ l.streamCancel()
+ }
+
+ l.streamCancel = cancel
+ l.stream = stream
+
+ go l.sendKeepAliveLoop(stream)
+ return stream, nil
+}
+
+// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
+func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
+ karesp := &LeaseKeepAliveResponse{
+ ResponseHeader: resp.GetHeader(),
+ ID: LeaseID(resp.ID),
+ TTL: resp.TTL,
+ }
+
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ ka, ok := l.keepAlives[karesp.ID]
+ if !ok {
+ return
+ }
+
+ if karesp.TTL <= 0 {
+ // lease expired; close all keep alive channels
+ delete(l.keepAlives, karesp.ID)
+ ka.close()
+ return
+ }
+
+ // send update to all channels
+ nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
+ ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
+ for _, ch := range ka.chs {
+ select {
+ case ch <- karesp:
+ default:
+ }
+ // still advance in order to rate-limit keep-alive sends
+ ka.nextKeepAlive = nextKeepAlive
+ }
+}
+
+// deadlineLoop reaps any keep alive channels that have not received a response
+// within the lease TTL
+func (l *lessor) deadlineLoop() {
+ for {
+ select {
+ case <-time.After(time.Second):
+ case <-l.donec:
+ return
+ }
+ now := time.Now()
+ l.mu.Lock()
+ for id, ka := range l.keepAlives {
+ if ka.deadline.Before(now) {
+ // waited too long for response; lease may be expired
+ ka.close()
+ delete(l.keepAlives, id)
+ }
+ }
+ l.mu.Unlock()
+ }
+}
+
+// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
+func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
+ for {
+ var tosend []LeaseID
+
+ now := time.Now()
+ l.mu.Lock()
+ for id, ka := range l.keepAlives {
+ if ka.nextKeepAlive.Before(now) {
+ tosend = append(tosend, id)
+ }
+ }
+ l.mu.Unlock()
+
+ for _, id := range tosend {
+ r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
+ if err := stream.Send(r); err != nil {
+ // TODO do something with this error?
+ return
+ }
+ }
+
+ select {
+ case <-time.After(500 * time.Millisecond):
+ case <-stream.Context().Done():
+ return
+ case <-l.donec:
+ return
+ case <-l.stopCtx.Done():
+ return
+ }
+ }
+}
+
+func (ka *keepAlive) close() {
+ close(ka.donec)
+ for _, ch := range ka.chs {
+ close(ch)
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/logger.go b/vendor/go.etcd.io/etcd/clientv3/logger.go
new file mode 100644
index 0000000..782e313
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/logger.go
@@ -0,0 +1,135 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "io/ioutil"
+ "sync"
+
+ "google.golang.org/grpc/grpclog"
+)
+
+// Logger is the logger used by client library.
+// It implements grpclog.LoggerV2 interface.
+type Logger interface {
+ grpclog.LoggerV2
+
+ // Lvl returns logger if logger's verbosity level >= "lvl".
+ // Otherwise, logger that discards all logs.
+ Lvl(lvl int) Logger
+
+ // to satisfy capnslog
+
+ Print(args ...interface{})
+ Printf(format string, args ...interface{})
+ Println(args ...interface{})
+}
+
+var (
+ loggerMu sync.RWMutex
+ logger Logger
+)
+
+type settableLogger struct {
+ l grpclog.LoggerV2
+ mu sync.RWMutex
+}
+
+func init() {
+ // disable client side logs by default
+ logger = &settableLogger{}
+ SetLogger(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
+}
+
+// SetLogger sets client-side Logger.
+func SetLogger(l grpclog.LoggerV2) {
+ loggerMu.Lock()
+ logger = NewLogger(l)
+ // override grpclog so that any changes happen with locking
+ grpclog.SetLoggerV2(logger)
+ loggerMu.Unlock()
+}
+
+// GetLogger returns the current logger.
+func GetLogger() Logger {
+ loggerMu.RLock()
+ l := logger
+ loggerMu.RUnlock()
+ return l
+}
+
+// NewLogger returns a new Logger with grpclog.LoggerV2.
+func NewLogger(gl grpclog.LoggerV2) Logger {
+ return &settableLogger{l: gl}
+}
+
+func (s *settableLogger) get() grpclog.LoggerV2 {
+ s.mu.RLock()
+ l := s.l
+ s.mu.RUnlock()
+ return l
+}
+
+// implement the grpclog.LoggerV2 interface
+
+func (s *settableLogger) Info(args ...interface{}) { s.get().Info(args...) }
+func (s *settableLogger) Infof(format string, args ...interface{}) { s.get().Infof(format, args...) }
+func (s *settableLogger) Infoln(args ...interface{}) { s.get().Infoln(args...) }
+func (s *settableLogger) Warning(args ...interface{}) { s.get().Warning(args...) }
+func (s *settableLogger) Warningf(format string, args ...interface{}) {
+ s.get().Warningf(format, args...)
+}
+func (s *settableLogger) Warningln(args ...interface{}) { s.get().Warningln(args...) }
+func (s *settableLogger) Error(args ...interface{}) { s.get().Error(args...) }
+func (s *settableLogger) Errorf(format string, args ...interface{}) {
+ s.get().Errorf(format, args...)
+}
+func (s *settableLogger) Errorln(args ...interface{}) { s.get().Errorln(args...) }
+func (s *settableLogger) Fatal(args ...interface{}) { s.get().Fatal(args...) }
+func (s *settableLogger) Fatalf(format string, args ...interface{}) { s.get().Fatalf(format, args...) }
+func (s *settableLogger) Fatalln(args ...interface{}) { s.get().Fatalln(args...) }
+func (s *settableLogger) Print(args ...interface{}) { s.get().Info(args...) }
+func (s *settableLogger) Printf(format string, args ...interface{}) { s.get().Infof(format, args...) }
+func (s *settableLogger) Println(args ...interface{}) { s.get().Infoln(args...) }
+func (s *settableLogger) V(l int) bool { return s.get().V(l) }
+func (s *settableLogger) Lvl(lvl int) Logger {
+ s.mu.RLock()
+ l := s.l
+ s.mu.RUnlock()
+ if l.V(lvl) {
+ return s
+ }
+ return &noLogger{}
+}
+
+type noLogger struct{}
+
+func (*noLogger) Info(args ...interface{}) {}
+func (*noLogger) Infof(format string, args ...interface{}) {}
+func (*noLogger) Infoln(args ...interface{}) {}
+func (*noLogger) Warning(args ...interface{}) {}
+func (*noLogger) Warningf(format string, args ...interface{}) {}
+func (*noLogger) Warningln(args ...interface{}) {}
+func (*noLogger) Error(args ...interface{}) {}
+func (*noLogger) Errorf(format string, args ...interface{}) {}
+func (*noLogger) Errorln(args ...interface{}) {}
+func (*noLogger) Fatal(args ...interface{}) {}
+func (*noLogger) Fatalf(format string, args ...interface{}) {}
+func (*noLogger) Fatalln(args ...interface{}) {}
+func (*noLogger) Print(args ...interface{}) {}
+func (*noLogger) Printf(format string, args ...interface{}) {}
+func (*noLogger) Println(args ...interface{}) {}
+func (*noLogger) V(l int) bool { return false }
+func (ng *noLogger) Lvl(lvl int) Logger { return ng }
diff --git a/vendor/go.etcd.io/etcd/clientv3/maintenance.go b/vendor/go.etcd.io/etcd/clientv3/maintenance.go
new file mode 100644
index 0000000..f60cfbe
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/maintenance.go
@@ -0,0 +1,226 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "io"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+type (
+ DefragmentResponse pb.DefragmentResponse
+ AlarmResponse pb.AlarmResponse
+ AlarmMember pb.AlarmMember
+ StatusResponse pb.StatusResponse
+ HashKVResponse pb.HashKVResponse
+ MoveLeaderResponse pb.MoveLeaderResponse
+)
+
+type Maintenance interface {
+ // AlarmList gets all active alarms.
+ AlarmList(ctx context.Context) (*AlarmResponse, error)
+
+ // AlarmDisarm disarms a given alarm.
+ AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)
+
+ // Defragment releases wasted space from internal fragmentation on a given etcd member.
+ // Defragment is only needed when deleting a large number of keys and want to reclaim
+ // the resources.
+ // Defragment is an expensive operation. User should avoid defragmenting multiple members
+ // at the same time.
+ // To defragment multiple members in the cluster, user need to call defragment multiple
+ // times with different endpoints.
+ Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error)
+
+ // Status gets the status of the endpoint.
+ Status(ctx context.Context, endpoint string) (*StatusResponse, error)
+
+ // HashKV returns a hash of the KV state at the time of the RPC.
+ // If revision is zero, the hash is computed on all keys. If the revision
+ // is non-zero, the hash is computed on all keys at or below the given revision.
+ HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
+
+ // Snapshot provides a reader for a point-in-time snapshot of etcd.
+ Snapshot(ctx context.Context) (io.ReadCloser, error)
+
+ // MoveLeader requests current leader to transfer its leadership to the transferee.
+ // Request must be made to the leader.
+ MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
+}
+
+type maintenance struct {
+ dial func(endpoint string) (pb.MaintenanceClient, func(), error)
+ remote pb.MaintenanceClient
+ callOpts []grpc.CallOption
+}
+
+func NewMaintenance(c *Client) Maintenance {
+ api := &maintenance{
+ dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
+ conn, err := c.dial(endpoint)
+ if err != nil {
+ return nil, nil, err
+ }
+ cancel := func() { conn.Close() }
+ return RetryMaintenanceClient(c, conn), cancel, nil
+ },
+ remote: RetryMaintenanceClient(c, c.conn),
+ }
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
+ api := &maintenance{
+ dial: func(string) (pb.MaintenanceClient, func(), error) {
+ return remote, func() {}, nil
+ },
+ remote: remote,
+ }
+ if c != nil {
+ api.callOpts = c.callOpts
+ }
+ return api
+}
+
+func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
+ req := &pb.AlarmRequest{
+ Action: pb.AlarmRequest_GET,
+ MemberID: 0, // all
+ Alarm: pb.AlarmType_NONE, // all
+ }
+ resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
+ if err == nil {
+ return (*AlarmResponse)(resp), nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
+ req := &pb.AlarmRequest{
+ Action: pb.AlarmRequest_DEACTIVATE,
+ MemberID: am.MemberID,
+ Alarm: am.Alarm,
+ }
+
+ if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
+ ar, err := m.AlarmList(ctx)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ ret := AlarmResponse{}
+ for _, am := range ar.Alarms {
+ dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
+ if derr != nil {
+ return nil, toErr(ctx, derr)
+ }
+ ret.Alarms = append(ret.Alarms, dresp.Alarms...)
+ }
+ return &ret, nil
+ }
+
+ resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
+ if err == nil {
+ return (*AlarmResponse)(resp), nil
+ }
+ return nil, toErr(ctx, err)
+}
+
+func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
+ remote, cancel, err := m.dial(endpoint)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ defer cancel()
+ resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*DefragmentResponse)(resp), nil
+}
+
+func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
+ remote, cancel, err := m.dial(endpoint)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ defer cancel()
+ resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*StatusResponse)(resp), nil
+}
+
+func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
+ remote, cancel, err := m.dial(endpoint)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ defer cancel()
+ resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+ return (*HashKVResponse)(resp), nil
+}
+
+func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
+ ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...)
+ if err != nil {
+ return nil, toErr(ctx, err)
+ }
+
+ pr, pw := io.Pipe()
+ go func() {
+ for {
+ resp, err := ss.Recv()
+ if err != nil {
+ pw.CloseWithError(err)
+ return
+ }
+ if resp == nil && err == nil {
+ break
+ }
+ if _, werr := pw.Write(resp.Blob); werr != nil {
+ pw.CloseWithError(werr)
+ return
+ }
+ }
+ pw.Close()
+ }()
+ return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
+}
+
+type snapshotReadCloser struct {
+ ctx context.Context
+ io.ReadCloser
+}
+
+func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
+ n, err = rc.ReadCloser.Read(p)
+ return n, toErr(rc.ctx, err)
+}
+
+func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
+ resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
+ return (*MoveLeaderResponse)(resp), toErr(ctx, err)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/op.go b/vendor/go.etcd.io/etcd/clientv3/op.go
new file mode 100644
index 0000000..c6ec5bf
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/op.go
@@ -0,0 +1,513 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+type opType int
+
+const (
+ // A default Op has opType 0, which is invalid.
+ tRange opType = iota + 1
+ tPut
+ tDeleteRange
+ tTxn
+)
+
+var (
+ noPrefixEnd = []byte{0}
+)
+
+// Op represents an Operation that kv can execute.
+type Op struct {
+ t opType
+ key []byte
+ end []byte
+
+ // for range
+ limit int64
+ sort *SortOption
+ serializable bool
+ keysOnly bool
+ countOnly bool
+ minModRev int64
+ maxModRev int64
+ minCreateRev int64
+ maxCreateRev int64
+
+ // for range, watch
+ rev int64
+
+ // for watch, put, delete
+ prevKV bool
+
+ // for put
+ ignoreValue bool
+ ignoreLease bool
+
+ // progressNotify is for progress updates.
+ progressNotify bool
+ // createdNotify is for created event
+ createdNotify bool
+ // filters for watchers
+ filterPut bool
+ filterDelete bool
+
+ // for put
+ val []byte
+ leaseID LeaseID
+
+ // txn
+ cmps []Cmp
+ thenOps []Op
+ elseOps []Op
+}
+
+// accessors / mutators
+
+func (op Op) IsTxn() bool { return op.t == tTxn }
+func (op Op) Txn() ([]Cmp, []Op, []Op) { return op.cmps, op.thenOps, op.elseOps }
+
+// KeyBytes returns the byte slice holding the Op's key.
+func (op Op) KeyBytes() []byte { return op.key }
+
+// WithKeyBytes sets the byte slice for the Op's key.
+func (op *Op) WithKeyBytes(key []byte) { op.key = key }
+
+// RangeBytes returns the byte slice holding with the Op's range end, if any.
+func (op Op) RangeBytes() []byte { return op.end }
+
+// Rev returns the requested revision, if any.
+func (op Op) Rev() int64 { return op.rev }
+
+// IsPut returns true iff the operation is a Put.
+func (op Op) IsPut() bool { return op.t == tPut }
+
+// IsGet returns true iff the operation is a Get.
+func (op Op) IsGet() bool { return op.t == tRange }
+
+// IsDelete returns true iff the operation is a Delete.
+func (op Op) IsDelete() bool { return op.t == tDeleteRange }
+
+// IsSerializable returns true if the serializable field is true.
+func (op Op) IsSerializable() bool { return op.serializable == true }
+
+// IsKeysOnly returns whether keysOnly is set.
+func (op Op) IsKeysOnly() bool { return op.keysOnly == true }
+
+// IsCountOnly returns whether countOnly is set.
+func (op Op) IsCountOnly() bool { return op.countOnly == true }
+
+// MinModRev returns the operation's minimum modify revision.
+func (op Op) MinModRev() int64 { return op.minModRev }
+
+// MaxModRev returns the operation's maximum modify revision.
+func (op Op) MaxModRev() int64 { return op.maxModRev }
+
+// MinCreateRev returns the operation's minimum create revision.
+func (op Op) MinCreateRev() int64 { return op.minCreateRev }
+
+// MaxCreateRev returns the operation's maximum create revision.
+func (op Op) MaxCreateRev() int64 { return op.maxCreateRev }
+
+// WithRangeBytes sets the byte slice for the Op's range end.
+func (op *Op) WithRangeBytes(end []byte) { op.end = end }
+
+// ValueBytes returns the byte slice holding the Op's value, if any.
+func (op Op) ValueBytes() []byte { return op.val }
+
+// WithValueBytes sets the byte slice for the Op's value.
+func (op *Op) WithValueBytes(v []byte) { op.val = v }
+
+func (op Op) toRangeRequest() *pb.RangeRequest {
+ if op.t != tRange {
+ panic("op.t != tRange")
+ }
+ r := &pb.RangeRequest{
+ Key: op.key,
+ RangeEnd: op.end,
+ Limit: op.limit,
+ Revision: op.rev,
+ Serializable: op.serializable,
+ KeysOnly: op.keysOnly,
+ CountOnly: op.countOnly,
+ MinModRevision: op.minModRev,
+ MaxModRevision: op.maxModRev,
+ MinCreateRevision: op.minCreateRev,
+ MaxCreateRevision: op.maxCreateRev,
+ }
+ if op.sort != nil {
+ r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
+ r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
+ }
+ return r
+}
+
+func (op Op) toTxnRequest() *pb.TxnRequest {
+ thenOps := make([]*pb.RequestOp, len(op.thenOps))
+ for i, tOp := range op.thenOps {
+ thenOps[i] = tOp.toRequestOp()
+ }
+ elseOps := make([]*pb.RequestOp, len(op.elseOps))
+ for i, eOp := range op.elseOps {
+ elseOps[i] = eOp.toRequestOp()
+ }
+ cmps := make([]*pb.Compare, len(op.cmps))
+ for i := range op.cmps {
+ cmps[i] = (*pb.Compare)(&op.cmps[i])
+ }
+ return &pb.TxnRequest{Compare: cmps, Success: thenOps, Failure: elseOps}
+}
+
+func (op Op) toRequestOp() *pb.RequestOp {
+ switch op.t {
+ case tRange:
+ return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: op.toRangeRequest()}}
+ case tPut:
+ r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
+ return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
+ case tDeleteRange:
+ r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
+ return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
+ case tTxn:
+ return &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: op.toTxnRequest()}}
+ default:
+ panic("Unknown Op")
+ }
+}
+
+func (op Op) isWrite() bool {
+ if op.t == tTxn {
+ for _, tOp := range op.thenOps {
+ if tOp.isWrite() {
+ return true
+ }
+ }
+ for _, tOp := range op.elseOps {
+ if tOp.isWrite() {
+ return true
+ }
+ }
+ return false
+ }
+ return op.t != tRange
+}
+
+func OpGet(key string, opts ...OpOption) Op {
+ ret := Op{t: tRange, key: []byte(key)}
+ ret.applyOpts(opts)
+ return ret
+}
+
+func OpDelete(key string, opts ...OpOption) Op {
+ ret := Op{t: tDeleteRange, key: []byte(key)}
+ ret.applyOpts(opts)
+ switch {
+ case ret.leaseID != 0:
+ panic("unexpected lease in delete")
+ case ret.limit != 0:
+ panic("unexpected limit in delete")
+ case ret.rev != 0:
+ panic("unexpected revision in delete")
+ case ret.sort != nil:
+ panic("unexpected sort in delete")
+ case ret.serializable:
+ panic("unexpected serializable in delete")
+ case ret.countOnly:
+ panic("unexpected countOnly in delete")
+ case ret.minModRev != 0, ret.maxModRev != 0:
+ panic("unexpected mod revision filter in delete")
+ case ret.minCreateRev != 0, ret.maxCreateRev != 0:
+ panic("unexpected create revision filter in delete")
+ case ret.filterDelete, ret.filterPut:
+ panic("unexpected filter in delete")
+ case ret.createdNotify:
+ panic("unexpected createdNotify in delete")
+ }
+ return ret
+}
+
+func OpPut(key, val string, opts ...OpOption) Op {
+ ret := Op{t: tPut, key: []byte(key), val: []byte(val)}
+ ret.applyOpts(opts)
+ switch {
+ case ret.end != nil:
+ panic("unexpected range in put")
+ case ret.limit != 0:
+ panic("unexpected limit in put")
+ case ret.rev != 0:
+ panic("unexpected revision in put")
+ case ret.sort != nil:
+ panic("unexpected sort in put")
+ case ret.serializable:
+ panic("unexpected serializable in put")
+ case ret.countOnly:
+ panic("unexpected countOnly in put")
+ case ret.minModRev != 0, ret.maxModRev != 0:
+ panic("unexpected mod revision filter in put")
+ case ret.minCreateRev != 0, ret.maxCreateRev != 0:
+ panic("unexpected create revision filter in put")
+ case ret.filterDelete, ret.filterPut:
+ panic("unexpected filter in put")
+ case ret.createdNotify:
+ panic("unexpected createdNotify in put")
+ }
+ return ret
+}
+
+func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op {
+ return Op{t: tTxn, cmps: cmps, thenOps: thenOps, elseOps: elseOps}
+}
+
+func opWatch(key string, opts ...OpOption) Op {
+ ret := Op{t: tRange, key: []byte(key)}
+ ret.applyOpts(opts)
+ switch {
+ case ret.leaseID != 0:
+ panic("unexpected lease in watch")
+ case ret.limit != 0:
+ panic("unexpected limit in watch")
+ case ret.sort != nil:
+ panic("unexpected sort in watch")
+ case ret.serializable:
+ panic("unexpected serializable in watch")
+ case ret.countOnly:
+ panic("unexpected countOnly in watch")
+ case ret.minModRev != 0, ret.maxModRev != 0:
+ panic("unexpected mod revision filter in watch")
+ case ret.minCreateRev != 0, ret.maxCreateRev != 0:
+ panic("unexpected create revision filter in watch")
+ }
+ return ret
+}
+
+func (op *Op) applyOpts(opts []OpOption) {
+ for _, opt := range opts {
+ opt(op)
+ }
+}
+
+// OpOption configures Operations like Get, Put, Delete.
+type OpOption func(*Op)
+
+// WithLease attaches a lease ID to a key in 'Put' request.
+func WithLease(leaseID LeaseID) OpOption {
+ return func(op *Op) { op.leaseID = leaseID }
+}
+
+// WithLimit limits the number of results to return from 'Get' request.
+// If WithLimit is given a 0 limit, it is treated as no limit.
+func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } }
+
+// WithRev specifies the store revision for 'Get' request.
+// Or the start revision of 'Watch' request.
+func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } }
+
+// WithSort specifies the ordering in 'Get' request. It requires
+// 'WithRange' and/or 'WithPrefix' to be specified too.
+// 'target' specifies the target to sort by: key, version, revisions, value.
+// 'order' can be either 'SortNone', 'SortAscend', 'SortDescend'.
+func WithSort(target SortTarget, order SortOrder) OpOption {
+ return func(op *Op) {
+ if target == SortByKey && order == SortAscend {
+ // If order != SortNone, server fetches the entire key-space,
+ // and then applies the sort and limit, if provided.
+ // Since by default the server returns results sorted by keys
+ // in lexicographically ascending order, the client should ignore
+ // SortOrder if the target is SortByKey.
+ order = SortNone
+ }
+ op.sort = &SortOption{target, order}
+ }
+}
+
+// GetPrefixRangeEnd gets the range end of the prefix.
+// 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
+func GetPrefixRangeEnd(prefix string) string {
+ return string(getPrefix([]byte(prefix)))
+}
+
+func getPrefix(key []byte) []byte {
+ end := make([]byte, len(key))
+ copy(end, key)
+ for i := len(end) - 1; i >= 0; i-- {
+ if end[i] < 0xff {
+ end[i] = end[i] + 1
+ end = end[:i+1]
+ return end
+ }
+ }
+ // next prefix does not exist (e.g., 0xffff);
+ // default to WithFromKey policy
+ return noPrefixEnd
+}
+
+// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
+// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
+// can return 'foo1', 'foo2', and so on.
+func WithPrefix() OpOption {
+ return func(op *Op) {
+ if len(op.key) == 0 {
+ op.key, op.end = []byte{0}, []byte{0}
+ return
+ }
+ op.end = getPrefix(op.key)
+ }
+}
+
+// WithRange specifies the range of 'Get', 'Delete', 'Watch' requests.
+// For example, 'Get' requests with 'WithRange(end)' returns
+// the keys in the range [key, end).
+// endKey must be lexicographically greater than start key.
+func WithRange(endKey string) OpOption {
+ return func(op *Op) { op.end = []byte(endKey) }
+}
+
+// WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests
+// to be equal or greater than the key in the argument.
+func WithFromKey() OpOption { return WithRange("\x00") }
+
+// WithSerializable makes 'Get' request serializable. By default,
+// it's linearizable. Serializable requests are better for lower latency
+// requirement.
+func WithSerializable() OpOption {
+ return func(op *Op) { op.serializable = true }
+}
+
+// WithKeysOnly makes the 'Get' request return only the keys and the corresponding
+// values will be omitted.
+func WithKeysOnly() OpOption {
+ return func(op *Op) { op.keysOnly = true }
+}
+
+// WithCountOnly makes the 'Get' request return only the count of keys.
+func WithCountOnly() OpOption {
+ return func(op *Op) { op.countOnly = true }
+}
+
+// WithMinModRev filters out keys for Get with modification revisions less than the given revision.
+func WithMinModRev(rev int64) OpOption { return func(op *Op) { op.minModRev = rev } }
+
+// WithMaxModRev filters out keys for Get with modification revisions greater than the given revision.
+func WithMaxModRev(rev int64) OpOption { return func(op *Op) { op.maxModRev = rev } }
+
+// WithMinCreateRev filters out keys for Get with creation revisions less than the given revision.
+func WithMinCreateRev(rev int64) OpOption { return func(op *Op) { op.minCreateRev = rev } }
+
+// WithMaxCreateRev filters out keys for Get with creation revisions greater than the given revision.
+func WithMaxCreateRev(rev int64) OpOption { return func(op *Op) { op.maxCreateRev = rev } }
+
+// WithFirstCreate gets the key with the oldest creation revision in the request range.
+func WithFirstCreate() []OpOption { return withTop(SortByCreateRevision, SortAscend) }
+
+// WithLastCreate gets the key with the latest creation revision in the request range.
+func WithLastCreate() []OpOption { return withTop(SortByCreateRevision, SortDescend) }
+
+// WithFirstKey gets the lexically first key in the request range.
+func WithFirstKey() []OpOption { return withTop(SortByKey, SortAscend) }
+
+// WithLastKey gets the lexically last key in the request range.
+func WithLastKey() []OpOption { return withTop(SortByKey, SortDescend) }
+
+// WithFirstRev gets the key with the oldest modification revision in the request range.
+func WithFirstRev() []OpOption { return withTop(SortByModRevision, SortAscend) }
+
+// WithLastRev gets the key with the latest modification revision in the request range.
+func WithLastRev() []OpOption { return withTop(SortByModRevision, SortDescend) }
+
+// withTop gets the first key over the get's prefix given a sort order
+func withTop(target SortTarget, order SortOrder) []OpOption {
+ return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
+}
+
+// WithProgressNotify makes watch server send periodic progress updates
+// every 10 minutes when there is no incoming events.
+// Progress updates have zero events in WatchResponse.
+func WithProgressNotify() OpOption {
+ return func(op *Op) {
+ op.progressNotify = true
+ }
+}
+
+// WithCreatedNotify makes watch server sends the created event.
+func WithCreatedNotify() OpOption {
+ return func(op *Op) {
+ op.createdNotify = true
+ }
+}
+
+// WithFilterPut discards PUT events from the watcher.
+func WithFilterPut() OpOption {
+ return func(op *Op) { op.filterPut = true }
+}
+
+// WithFilterDelete discards DELETE events from the watcher.
+func WithFilterDelete() OpOption {
+ return func(op *Op) { op.filterDelete = true }
+}
+
+// WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
+// nothing will be returned.
+func WithPrevKV() OpOption {
+ return func(op *Op) {
+ op.prevKV = true
+ }
+}
+
+// WithIgnoreValue updates the key using its current value.
+// This option can not be combined with non-empty values.
+// Returns an error if the key does not exist.
+func WithIgnoreValue() OpOption {
+ return func(op *Op) {
+ op.ignoreValue = true
+ }
+}
+
+// WithIgnoreLease updates the key using its current lease.
+// This option can not be combined with WithLease.
+// Returns an error if the key does not exist.
+func WithIgnoreLease() OpOption {
+ return func(op *Op) {
+ op.ignoreLease = true
+ }
+}
+
+// LeaseOp represents an Operation that lease can execute.
+type LeaseOp struct {
+ id LeaseID
+
+ // for TimeToLive
+ attachedKeys bool
+}
+
+// LeaseOption configures lease operations.
+type LeaseOption func(*LeaseOp)
+
+func (op *LeaseOp) applyOpts(opts []LeaseOption) {
+ for _, opt := range opts {
+ opt(op)
+ }
+}
+
+// WithAttachedKeys makes TimeToLive list the keys attached to the given lease ID.
+func WithAttachedKeys() LeaseOption {
+ return func(op *LeaseOp) { op.attachedKeys = true }
+}
+
+func toLeaseTimeToLiveRequest(id LeaseID, opts ...LeaseOption) *pb.LeaseTimeToLiveRequest {
+ ret := &LeaseOp{id: id}
+ ret.applyOpts(opts)
+ return &pb.LeaseTimeToLiveRequest{ID: int64(id), Keys: ret.attachedKeys}
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/options.go b/vendor/go.etcd.io/etcd/clientv3/options.go
new file mode 100644
index 0000000..fa25811
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/options.go
@@ -0,0 +1,49 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "math"
+
+ "google.golang.org/grpc"
+)
+
+var (
+ // Disable gRPC internal retrial logic
+ // TODO: enable when gRPC retry is stable (FailFast=false)
+ // Reference:
+ // - https://github.com/grpc/grpc-go/issues/1532
+ // - https://github.com/grpc/proposal/blob/master/A6-client-retries.md
+ defaultFailFast = grpc.FailFast(true)
+
+ // client-side request send limit, gRPC default is math.MaxInt32
+ // Make sure that "client-side send limit < server-side default send/recv limit"
+ // Same value as "embed.DefaultMaxRequestBytes" plus gRPC overhead bytes
+ defaultMaxCallSendMsgSize = grpc.MaxCallSendMsgSize(2 * 1024 * 1024)
+
+ // client-side response receive limit, gRPC default is 4MB
+ // Make sure that "client-side receive limit >= server-side default send/recv limit"
+ // because range response can easily exceed request send limits
+ // Default to math.MaxInt32; writes exceeding server-side send limit fails anyway
+ defaultMaxCallRecvMsgSize = grpc.MaxCallRecvMsgSize(math.MaxInt32)
+)
+
+// defaultCallOpts defines a list of default "gRPC.CallOption".
+// Some options are exposed to "clientv3.Config".
+// Defaults will be overridden by the settings in "clientv3.Config".
+var defaultCallOpts = []grpc.CallOption{defaultFailFast, defaultMaxCallSendMsgSize, defaultMaxCallRecvMsgSize}
+
+// MaxLeaseTTL is the maximum lease TTL value
+const MaxLeaseTTL = 9000000000
diff --git a/vendor/go.etcd.io/etcd/clientv3/ready_wait.go b/vendor/go.etcd.io/etcd/clientv3/ready_wait.go
new file mode 100644
index 0000000..c6ef585
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/ready_wait.go
@@ -0,0 +1,30 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import "context"
+
+// TODO: remove this when "FailFast=false" is fixed.
+// See https://github.com/grpc/grpc-go/issues/1532.
+func readyWait(rpcCtx, clientCtx context.Context, ready <-chan struct{}) error {
+ select {
+ case <-ready:
+ return nil
+ case <-rpcCtx.Done():
+ return rpcCtx.Err()
+ case <-clientCtx.Done():
+ return clientCtx.Err()
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/retry.go b/vendor/go.etcd.io/etcd/clientv3/retry.go
new file mode 100644
index 0000000..7f89ba6
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/retry.go
@@ -0,0 +1,496 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type retryPolicy uint8
+
+const (
+ repeatable retryPolicy = iota
+ nonRepeatable
+)
+
+type rpcFunc func(ctx context.Context) error
+type retryRPCFunc func(context.Context, rpcFunc, retryPolicy) error
+type retryStopErrFunc func(error) bool
+
+// immutable requests (e.g. Get) should be retried unless it's
+// an obvious server-side error (e.g. rpctypes.ErrRequestTooLarge).
+//
+// "isRepeatableStopError" returns "true" when an immutable request
+// is interrupted by server-side or gRPC-side error and its status
+// code is not transient (!= codes.Unavailable).
+//
+// Returning "true" means retry should stop, since client cannot
+// handle itself even with retries.
+func isRepeatableStopError(err error) bool {
+ eErr := rpctypes.Error(err)
+ // always stop retry on etcd errors
+ if serverErr, ok := eErr.(rpctypes.EtcdError); ok && serverErr.Code() != codes.Unavailable {
+ return true
+ }
+ // only retry if unavailable
+ ev, _ := status.FromError(err)
+ return ev.Code() != codes.Unavailable
+}
+
+// mutable requests (e.g. Put, Delete, Txn) should only be retried
+// when the status code is codes.Unavailable when initial connection
+// has not been established (no pinned endpoint).
+//
+// "isNonRepeatableStopError" returns "true" when a mutable request
+// is interrupted by non-transient error that client cannot handle itself,
+// or transient error while the connection has already been established
+// (pinned endpoint exists).
+//
+// Returning "true" means retry should stop, otherwise it violates
+// write-at-most-once semantics.
+func isNonRepeatableStopError(err error) bool {
+ ev, _ := status.FromError(err)
+ if ev.Code() != codes.Unavailable {
+ return true
+ }
+ desc := rpctypes.ErrorDesc(err)
+ return desc != "there is no address available" && desc != "there is no connection available"
+}
+
+func (c *Client) newRetryWrapper() retryRPCFunc {
+ return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error {
+ var isStop retryStopErrFunc
+ switch rp {
+ case repeatable:
+ isStop = isRepeatableStopError
+ case nonRepeatable:
+ isStop = isNonRepeatableStopError
+ }
+ for {
+ if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil {
+ return err
+ }
+ pinned := c.balancer.pinned()
+ err := f(rpcCtx)
+ if err == nil {
+ return nil
+ }
+ logger.Lvl(4).Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned)
+
+ if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) {
+ // mark this before endpoint switch is triggered
+ c.balancer.hostPortError(pinned, err)
+ c.balancer.next()
+ logger.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error())
+ }
+
+ if isStop(err) {
+ return err
+ }
+ }
+ }
+}
+
+func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc {
+ return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error {
+ for {
+ pinned := c.balancer.pinned()
+ err := retryf(rpcCtx, f, rp)
+ if err == nil {
+ return nil
+ }
+ logger.Lvl(4).Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned)
+ // always stop retry on etcd errors other than invalid auth token
+ if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
+ gterr := c.getToken(rpcCtx)
+ if gterr != nil {
+ logger.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned)
+ return err // return the original error for simplicity
+ }
+ continue
+ }
+ return err
+ }
+ }
+}
+
+type retryKVClient struct {
+ kc pb.KVClient
+ retryf retryRPCFunc
+}
+
+// RetryKVClient implements a KVClient.
+func RetryKVClient(c *Client) pb.KVClient {
+ return &retryKVClient{
+ kc: pb.NewKVClient(c.conn),
+ retryf: c.newAuthRetryWrapper(c.newRetryWrapper()),
+ }
+}
+func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
+ err = rkv.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rkv.kc.Range(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
+ err = rkv.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rkv.kc.Put(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rkv *retryKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
+ err = rkv.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rkv.kc.DeleteRange(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rkv *retryKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
+ // TODO: "repeatable" for read-only txn
+ err = rkv.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rkv.kc.Txn(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rkv *retryKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
+ err = rkv.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rkv.kc.Compact(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+type retryLeaseClient struct {
+ lc pb.LeaseClient
+ retryf retryRPCFunc
+}
+
+// RetryLeaseClient implements a LeaseClient.
+func RetryLeaseClient(c *Client) pb.LeaseClient {
+ return &retryLeaseClient{
+ lc: pb.NewLeaseClient(c.conn),
+ retryf: c.newAuthRetryWrapper(c.newRetryWrapper()),
+ }
+}
+
+func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) {
+ err = rlc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rlc.lc.LeaseTimeToLive(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) {
+ err = rlc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rlc.lc.LeaseLeases(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
+ err = rlc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rlc.lc.LeaseGrant(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+
+}
+
+func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) {
+ err = rlc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rlc.lc.LeaseRevoke(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) {
+ err = rlc.retryf(ctx, func(rctx context.Context) error {
+ stream, err = rlc.lc.LeaseKeepAlive(rctx, opts...)
+ return err
+ }, repeatable)
+ return stream, err
+}
+
+type retryClusterClient struct {
+ cc pb.ClusterClient
+ retryf retryRPCFunc
+}
+
+// RetryClusterClient implements a ClusterClient.
+func RetryClusterClient(c *Client) pb.ClusterClient {
+ return &retryClusterClient{
+ cc: pb.NewClusterClient(c.conn),
+ retryf: c.newRetryWrapper(),
+ }
+}
+
+func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) {
+ err = rcc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rcc.cc.MemberList(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
+ err = rcc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rcc.cc.MemberAdd(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) {
+ err = rcc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rcc.cc.MemberRemove(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) {
+ err = rcc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rcc.cc.MemberUpdate(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+type retryMaintenanceClient struct {
+ mc pb.MaintenanceClient
+ retryf retryRPCFunc
+}
+
+// RetryMaintenanceClient implements a Maintenance.
+func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient {
+ return &retryMaintenanceClient{
+ mc: pb.NewMaintenanceClient(conn),
+ retryf: c.newRetryWrapper(),
+ }
+}
+
+func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) {
+ err = rmc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rmc.mc.Alarm(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) {
+ err = rmc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rmc.mc.Status(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, opts ...grpc.CallOption) (resp *pb.HashResponse, err error) {
+ err = rmc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rmc.mc.Hash(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequest, opts ...grpc.CallOption) (resp *pb.HashKVResponse, err error) {
+ err = rmc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rmc.mc.HashKV(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (stream pb.Maintenance_SnapshotClient, err error) {
+ err = rmc.retryf(ctx, func(rctx context.Context) error {
+ stream, err = rmc.mc.Snapshot(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return stream, err
+}
+
+func (rmc *retryMaintenanceClient) MoveLeader(ctx context.Context, in *pb.MoveLeaderRequest, opts ...grpc.CallOption) (resp *pb.MoveLeaderResponse, err error) {
+ err = rmc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rmc.mc.MoveLeader(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rmc *retryMaintenanceClient) Defragment(ctx context.Context, in *pb.DefragmentRequest, opts ...grpc.CallOption) (resp *pb.DefragmentResponse, err error) {
+ err = rmc.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rmc.mc.Defragment(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+type retryAuthClient struct {
+ ac pb.AuthClient
+ retryf retryRPCFunc
+}
+
+// RetryAuthClient implements a AuthClient.
+func RetryAuthClient(c *Client) pb.AuthClient {
+ return &retryAuthClient{
+ ac: pb.NewAuthClient(c.conn),
+ retryf: c.newRetryWrapper(),
+ }
+}
+
+func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (resp *pb.AuthUserListResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.UserList(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) UserGet(ctx context.Context, in *pb.AuthUserGetRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGetResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.UserGet(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) RoleGet(ctx context.Context, in *pb.AuthRoleGetRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGetResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.RoleGet(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) RoleList(ctx context.Context, in *pb.AuthRoleListRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleListResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.RoleList(rctx, in, opts...)
+ return err
+ }, repeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.AuthEnable(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.AuthDisable(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.UserAdd(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.UserDelete(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.UserChangePassword(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.UserGrantRole(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.UserRevokeRole(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.RoleAdd(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.RoleDelete(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.RoleGrantPermission(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.RoleRevokePermission(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
+
+func (rac *retryAuthClient) Authenticate(ctx context.Context, in *pb.AuthenticateRequest, opts ...grpc.CallOption) (resp *pb.AuthenticateResponse, err error) {
+ err = rac.retryf(ctx, func(rctx context.Context) error {
+ resp, err = rac.ac.Authenticate(rctx, in, opts...)
+ return err
+ }, nonRepeatable)
+ return resp, err
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/sort.go b/vendor/go.etcd.io/etcd/clientv3/sort.go
new file mode 100644
index 0000000..2bb9d9a
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/sort.go
@@ -0,0 +1,37 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+type SortTarget int
+type SortOrder int
+
+const (
+ SortNone SortOrder = iota
+ SortAscend
+ SortDescend
+)
+
+const (
+ SortByKey SortTarget = iota
+ SortByVersion
+ SortByCreateRevision
+ SortByModRevision
+ SortByValue
+)
+
+type SortOption struct {
+ Target SortTarget
+ Order SortOrder
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/txn.go b/vendor/go.etcd.io/etcd/clientv3/txn.go
new file mode 100644
index 0000000..c3c2d24
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/txn.go
@@ -0,0 +1,151 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "sync"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+// Txn is the interface that wraps mini-transactions.
+//
+// Txn(context.TODO()).If(
+// Compare(Value(k1), ">", v1),
+// Compare(Version(k1), "=", 2)
+// ).Then(
+// OpPut(k2,v2), OpPut(k3,v3)
+// ).Else(
+// OpPut(k4,v4), OpPut(k5,v5)
+// ).Commit()
+//
+type Txn interface {
+ // If takes a list of comparison. If all comparisons passed in succeed,
+ // the operations passed into Then() will be executed. Or the operations
+ // passed into Else() will be executed.
+ If(cs ...Cmp) Txn
+
+ // Then takes a list of operations. The Ops list will be executed, if the
+ // comparisons passed in If() succeed.
+ Then(ops ...Op) Txn
+
+ // Else takes a list of operations. The Ops list will be executed, if the
+ // comparisons passed in If() fail.
+ Else(ops ...Op) Txn
+
+ // Commit tries to commit the transaction.
+ Commit() (*TxnResponse, error)
+}
+
+type txn struct {
+ kv *kv
+ ctx context.Context
+
+ mu sync.Mutex
+ cif bool
+ cthen bool
+ celse bool
+
+ isWrite bool
+
+ cmps []*pb.Compare
+
+ sus []*pb.RequestOp
+ fas []*pb.RequestOp
+
+ callOpts []grpc.CallOption
+}
+
+func (txn *txn) If(cs ...Cmp) Txn {
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+
+ if txn.cif {
+ panic("cannot call If twice!")
+ }
+
+ if txn.cthen {
+ panic("cannot call If after Then!")
+ }
+
+ if txn.celse {
+ panic("cannot call If after Else!")
+ }
+
+ txn.cif = true
+
+ for i := range cs {
+ txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
+ }
+
+ return txn
+}
+
+func (txn *txn) Then(ops ...Op) Txn {
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+
+ if txn.cthen {
+ panic("cannot call Then twice!")
+ }
+ if txn.celse {
+ panic("cannot call Then after Else!")
+ }
+
+ txn.cthen = true
+
+ for _, op := range ops {
+ txn.isWrite = txn.isWrite || op.isWrite()
+ txn.sus = append(txn.sus, op.toRequestOp())
+ }
+
+ return txn
+}
+
+func (txn *txn) Else(ops ...Op) Txn {
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+
+ if txn.celse {
+ panic("cannot call Else twice!")
+ }
+
+ txn.celse = true
+
+ for _, op := range ops {
+ txn.isWrite = txn.isWrite || op.isWrite()
+ txn.fas = append(txn.fas, op.toRequestOp())
+ }
+
+ return txn
+}
+
+func (txn *txn) Commit() (*TxnResponse, error) {
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+
+ r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
+
+ var resp *pb.TxnResponse
+ var err error
+ resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
+ if err != nil {
+ return nil, toErr(txn.ctx, err)
+ }
+ return (*TxnResponse)(resp), nil
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/watch.go b/vendor/go.etcd.io/etcd/clientv3/watch.go
new file mode 100644
index 0000000..d763385
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/watch.go
@@ -0,0 +1,828 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ EventTypeDelete = mvccpb.DELETE
+ EventTypePut = mvccpb.PUT
+
+ closeSendErrTimeout = 250 * time.Millisecond
+)
+
+type Event mvccpb.Event
+
+type WatchChan <-chan WatchResponse
+
+type Watcher interface {
+ // Watch watches on a key or prefix. The watched events will be returned
+ // through the returned channel. If revisions waiting to be sent over the
+ // watch are compacted, then the watch will be canceled by the server, the
+ // client will post a compacted error watch response, and the channel will close.
+ Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
+
+ // Close closes the watcher and cancels all watch requests.
+ Close() error
+}
+
+type WatchResponse struct {
+ Header pb.ResponseHeader
+ Events []*Event
+
+ // CompactRevision is the minimum revision the watcher may receive.
+ CompactRevision int64
+
+ // Canceled is used to indicate watch failure.
+ // If the watch failed and the stream was about to close, before the channel is closed,
+ // the channel sends a final response that has Canceled set to true with a non-nil Err().
+ Canceled bool
+
+ // Created is used to indicate the creation of the watcher.
+ Created bool
+
+ closeErr error
+
+ // cancelReason is a reason of canceling watch
+ cancelReason string
+}
+
+// IsCreate returns true if the event tells that the key is newly created.
+func (e *Event) IsCreate() bool {
+ return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
+}
+
+// IsModify returns true if the event tells that a new value is put on existing key.
+func (e *Event) IsModify() bool {
+ return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
+}
+
+// Err is the error value if this WatchResponse holds an error.
+func (wr *WatchResponse) Err() error {
+ switch {
+ case wr.closeErr != nil:
+ return v3rpc.Error(wr.closeErr)
+ case wr.CompactRevision != 0:
+ return v3rpc.ErrCompacted
+ case wr.Canceled:
+ if len(wr.cancelReason) != 0 {
+ return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
+ }
+ return v3rpc.ErrFutureRev
+ }
+ return nil
+}
+
+// IsProgressNotify returns true if the WatchResponse is progress notification.
+func (wr *WatchResponse) IsProgressNotify() bool {
+ return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
+}
+
+// watcher implements the Watcher interface
+type watcher struct {
+ remote pb.WatchClient
+ callOpts []grpc.CallOption
+
+ // mu protects the grpc streams map
+ mu sync.RWMutex
+
+ // streams holds all the active grpc streams keyed by ctx value.
+ streams map[string]*watchGrpcStream
+}
+
+// watchGrpcStream tracks all watch resources attached to a single grpc stream.
+type watchGrpcStream struct {
+ owner *watcher
+ remote pb.WatchClient
+ callOpts []grpc.CallOption
+
+ // ctx controls internal remote.Watch requests
+ ctx context.Context
+ // ctxKey is the key used when looking up this stream's context
+ ctxKey string
+ cancel context.CancelFunc
+
+ // substreams holds all active watchers on this grpc stream
+ substreams map[int64]*watcherStream
+ // resuming holds all resuming watchers on this grpc stream
+ resuming []*watcherStream
+
+ // reqc sends a watch request from Watch() to the main goroutine
+ reqc chan *watchRequest
+ // respc receives data from the watch client
+ respc chan *pb.WatchResponse
+ // donec closes to broadcast shutdown
+ donec chan struct{}
+ // errc transmits errors from grpc Recv to the watch stream reconnect logic
+ errc chan error
+ // closingc gets the watcherStream of closing watchers
+ closingc chan *watcherStream
+ // wg is Done when all substream goroutines have exited
+ wg sync.WaitGroup
+
+ // resumec closes to signal that all substreams should begin resuming
+ resumec chan struct{}
+ // closeErr is the error that closed the watch stream
+ closeErr error
+}
+
+// watchRequest is issued by the subscriber to start a new watcher
+type watchRequest struct {
+ ctx context.Context
+ key string
+ end string
+ rev int64
+ // send created notification event if this field is true
+ createdNotify bool
+ // progressNotify is for progress updates
+ progressNotify bool
+ // filters is the list of events to filter out
+ filters []pb.WatchCreateRequest_FilterType
+ // get the previous key-value pair before the event happens
+ prevKV bool
+ // retc receives a chan WatchResponse once the watcher is established
+ retc chan chan WatchResponse
+}
+
+// watcherStream represents a registered watcher
+type watcherStream struct {
+ // initReq is the request that initiated this request
+ initReq watchRequest
+
+ // outc publishes watch responses to subscriber
+ outc chan WatchResponse
+ // recvc buffers watch responses before publishing
+ recvc chan *WatchResponse
+ // donec closes when the watcherStream goroutine stops.
+ donec chan struct{}
+ // closing is set to true when stream should be scheduled to shutdown.
+ closing bool
+ // id is the registered watch id on the grpc stream
+ id int64
+
+ // buf holds all events received from etcd but not yet consumed by the client
+ buf []*WatchResponse
+}
+
+func NewWatcher(c *Client) Watcher {
+ return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
+}
+
+func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
+ w := &watcher{
+ remote: wc,
+ streams: make(map[string]*watchGrpcStream),
+ }
+ if c != nil {
+ w.callOpts = c.callOpts
+ }
+ return w
+}
+
+// never closes
+var valCtxCh = make(chan struct{})
+var zeroTime = time.Unix(0, 0)
+
+// ctx with only the values; never Done
+type valCtx struct{ context.Context }
+
+func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
+func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
+func (vc *valCtx) Err() error { return nil }
+
+func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
+ ctx, cancel := context.WithCancel(&valCtx{inctx})
+ wgs := &watchGrpcStream{
+ owner: w,
+ remote: w.remote,
+ callOpts: w.callOpts,
+ ctx: ctx,
+ ctxKey: streamKeyFromCtx(inctx),
+ cancel: cancel,
+ substreams: make(map[int64]*watcherStream),
+ respc: make(chan *pb.WatchResponse),
+ reqc: make(chan *watchRequest),
+ donec: make(chan struct{}),
+ errc: make(chan error, 1),
+ closingc: make(chan *watcherStream),
+ resumec: make(chan struct{}),
+ }
+ go wgs.run()
+ return wgs
+}
+
+// Watch posts a watch request to run() and waits for a new watcher channel
+func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
+ ow := opWatch(key, opts...)
+
+ var filters []pb.WatchCreateRequest_FilterType
+ if ow.filterPut {
+ filters = append(filters, pb.WatchCreateRequest_NOPUT)
+ }
+ if ow.filterDelete {
+ filters = append(filters, pb.WatchCreateRequest_NODELETE)
+ }
+
+ wr := &watchRequest{
+ ctx: ctx,
+ createdNotify: ow.createdNotify,
+ key: string(ow.key),
+ end: string(ow.end),
+ rev: ow.rev,
+ progressNotify: ow.progressNotify,
+ filters: filters,
+ prevKV: ow.prevKV,
+ retc: make(chan chan WatchResponse, 1),
+ }
+
+ ok := false
+ ctxKey := streamKeyFromCtx(ctx)
+
+ // find or allocate appropriate grpc watch stream
+ w.mu.Lock()
+ if w.streams == nil {
+ // closed
+ w.mu.Unlock()
+ ch := make(chan WatchResponse)
+ close(ch)
+ return ch
+ }
+ wgs := w.streams[ctxKey]
+ if wgs == nil {
+ wgs = w.newWatcherGrpcStream(ctx)
+ w.streams[ctxKey] = wgs
+ }
+ donec := wgs.donec
+ reqc := wgs.reqc
+ w.mu.Unlock()
+
+ // couldn't create channel; return closed channel
+ closeCh := make(chan WatchResponse, 1)
+
+ // submit request
+ select {
+ case reqc <- wr:
+ ok = true
+ case <-wr.ctx.Done():
+ case <-donec:
+ if wgs.closeErr != nil {
+ closeCh <- WatchResponse{closeErr: wgs.closeErr}
+ break
+ }
+ // retry; may have dropped stream from no ctxs
+ return w.Watch(ctx, key, opts...)
+ }
+
+ // receive channel
+ if ok {
+ select {
+ case ret := <-wr.retc:
+ return ret
+ case <-ctx.Done():
+ case <-donec:
+ if wgs.closeErr != nil {
+ closeCh <- WatchResponse{closeErr: wgs.closeErr}
+ break
+ }
+ // retry; may have dropped stream from no ctxs
+ return w.Watch(ctx, key, opts...)
+ }
+ }
+
+ close(closeCh)
+ return closeCh
+}
+
+func (w *watcher) Close() (err error) {
+ w.mu.Lock()
+ streams := w.streams
+ w.streams = nil
+ w.mu.Unlock()
+ for _, wgs := range streams {
+ if werr := wgs.close(); werr != nil {
+ err = werr
+ }
+ }
+ return err
+}
+
+func (w *watchGrpcStream) close() (err error) {
+ w.cancel()
+ <-w.donec
+ select {
+ case err = <-w.errc:
+ default:
+ }
+ return toErr(w.ctx, err)
+}
+
+func (w *watcher) closeStream(wgs *watchGrpcStream) {
+ w.mu.Lock()
+ close(wgs.donec)
+ wgs.cancel()
+ if w.streams != nil {
+ delete(w.streams, wgs.ctxKey)
+ }
+ w.mu.Unlock()
+}
+
+func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
+ if resp.WatchId == -1 {
+ // failed; no channel
+ close(ws.recvc)
+ return
+ }
+ ws.id = resp.WatchId
+ w.substreams[ws.id] = ws
+}
+
+func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
+ select {
+ case ws.outc <- *resp:
+ case <-ws.initReq.ctx.Done():
+ case <-time.After(closeSendErrTimeout):
+ }
+ close(ws.outc)
+}
+
+func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
+ // send channel response in case stream was never established
+ select {
+ case ws.initReq.retc <- ws.outc:
+ default:
+ }
+ // close subscriber's channel
+ if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
+ go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
+ } else if ws.outc != nil {
+ close(ws.outc)
+ }
+ if ws.id != -1 {
+ delete(w.substreams, ws.id)
+ return
+ }
+ for i := range w.resuming {
+ if w.resuming[i] == ws {
+ w.resuming[i] = nil
+ return
+ }
+ }
+}
+
+// run is the root of the goroutines for managing a watcher client
+func (w *watchGrpcStream) run() {
+ var wc pb.Watch_WatchClient
+ var closeErr error
+
+ // substreams marked to close but goroutine still running; needed for
+ // avoiding double-closing recvc on grpc stream teardown
+ closing := make(map[*watcherStream]struct{})
+
+ defer func() {
+ w.closeErr = closeErr
+ // shutdown substreams and resuming substreams
+ for _, ws := range w.substreams {
+ if _, ok := closing[ws]; !ok {
+ close(ws.recvc)
+ closing[ws] = struct{}{}
+ }
+ }
+ for _, ws := range w.resuming {
+ if _, ok := closing[ws]; ws != nil && !ok {
+ close(ws.recvc)
+ closing[ws] = struct{}{}
+ }
+ }
+ w.joinSubstreams()
+ for range closing {
+ w.closeSubstream(<-w.closingc)
+ }
+ w.wg.Wait()
+ w.owner.closeStream(w)
+ }()
+
+ // start a stream with the etcd grpc server
+ if wc, closeErr = w.newWatchClient(); closeErr != nil {
+ return
+ }
+
+ cancelSet := make(map[int64]struct{})
+
+ for {
+ select {
+ // Watch() requested
+ case wreq := <-w.reqc:
+ outc := make(chan WatchResponse, 1)
+ ws := &watcherStream{
+ initReq: *wreq,
+ id: -1,
+ outc: outc,
+ // unbuffered so resumes won't cause repeat events
+ recvc: make(chan *WatchResponse),
+ }
+
+ ws.donec = make(chan struct{})
+ w.wg.Add(1)
+ go w.serveSubstream(ws, w.resumec)
+
+ // queue up for watcher creation/resume
+ w.resuming = append(w.resuming, ws)
+ if len(w.resuming) == 1 {
+ // head of resume queue, can register a new watcher
+ wc.Send(ws.initReq.toPB())
+ }
+ // New events from the watch client
+ case pbresp := <-w.respc:
+ switch {
+ case pbresp.Created:
+ // response to head of queue creation
+ if ws := w.resuming[0]; ws != nil {
+ w.addSubstream(pbresp, ws)
+ w.dispatchEvent(pbresp)
+ w.resuming[0] = nil
+ }
+ if ws := w.nextResume(); ws != nil {
+ wc.Send(ws.initReq.toPB())
+ }
+ case pbresp.Canceled && pbresp.CompactRevision == 0:
+ delete(cancelSet, pbresp.WatchId)
+ if ws, ok := w.substreams[pbresp.WatchId]; ok {
+ // signal to stream goroutine to update closingc
+ close(ws.recvc)
+ closing[ws] = struct{}{}
+ }
+ default:
+ // dispatch to appropriate watch stream
+ if ok := w.dispatchEvent(pbresp); ok {
+ break
+ }
+ // watch response on unexpected watch id; cancel id
+ if _, ok := cancelSet[pbresp.WatchId]; ok {
+ break
+ }
+ cancelSet[pbresp.WatchId] = struct{}{}
+ cr := &pb.WatchRequest_CancelRequest{
+ CancelRequest: &pb.WatchCancelRequest{
+ WatchId: pbresp.WatchId,
+ },
+ }
+ req := &pb.WatchRequest{RequestUnion: cr}
+ wc.Send(req)
+ }
+ // watch client failed on Recv; spawn another if possible
+ case err := <-w.errc:
+ if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
+ closeErr = err
+ return
+ }
+ if wc, closeErr = w.newWatchClient(); closeErr != nil {
+ return
+ }
+ if ws := w.nextResume(); ws != nil {
+ wc.Send(ws.initReq.toPB())
+ }
+ cancelSet = make(map[int64]struct{})
+ case <-w.ctx.Done():
+ return
+ case ws := <-w.closingc:
+ w.closeSubstream(ws)
+ delete(closing, ws)
+ if len(w.substreams)+len(w.resuming) == 0 {
+ // no more watchers on this stream, shutdown
+ return
+ }
+ }
+ }
+}
+
+// nextResume chooses the next resuming to register with the grpc stream. Abandoned
+// streams are marked as nil in the queue since the head must wait for its inflight registration.
+func (w *watchGrpcStream) nextResume() *watcherStream {
+ for len(w.resuming) != 0 {
+ if w.resuming[0] != nil {
+ return w.resuming[0]
+ }
+ w.resuming = w.resuming[1:len(w.resuming)]
+ }
+ return nil
+}
+
+// dispatchEvent sends a WatchResponse to the appropriate watcher stream
+func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
+ events := make([]*Event, len(pbresp.Events))
+ for i, ev := range pbresp.Events {
+ events[i] = (*Event)(ev)
+ }
+ wr := &WatchResponse{
+ Header: *pbresp.Header,
+ Events: events,
+ CompactRevision: pbresp.CompactRevision,
+ Created: pbresp.Created,
+ Canceled: pbresp.Canceled,
+ cancelReason: pbresp.CancelReason,
+ }
+ ws, ok := w.substreams[pbresp.WatchId]
+ if !ok {
+ return false
+ }
+ select {
+ case ws.recvc <- wr:
+ case <-ws.donec:
+ return false
+ }
+ return true
+}
+
+// serveWatchClient forwards messages from the grpc stream to run()
+func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
+ for {
+ resp, err := wc.Recv()
+ if err != nil {
+ select {
+ case w.errc <- err:
+ case <-w.donec:
+ }
+ return
+ }
+ select {
+ case w.respc <- resp:
+ case <-w.donec:
+ return
+ }
+ }
+}
+
+// serveSubstream forwards watch responses from run() to the subscriber
+func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
+ if ws.closing {
+ panic("created substream goroutine but substream is closing")
+ }
+
+ // nextRev is the minimum expected next revision
+ nextRev := ws.initReq.rev
+ resuming := false
+ defer func() {
+ if !resuming {
+ ws.closing = true
+ }
+ close(ws.donec)
+ if !resuming {
+ w.closingc <- ws
+ }
+ w.wg.Done()
+ }()
+
+ emptyWr := &WatchResponse{}
+ for {
+ curWr := emptyWr
+ outc := ws.outc
+
+ if len(ws.buf) > 0 {
+ curWr = ws.buf[0]
+ } else {
+ outc = nil
+ }
+ select {
+ case outc <- *curWr:
+ if ws.buf[0].Err() != nil {
+ return
+ }
+ ws.buf[0] = nil
+ ws.buf = ws.buf[1:]
+ case wr, ok := <-ws.recvc:
+ if !ok {
+ // shutdown from closeSubstream
+ return
+ }
+
+ if wr.Created {
+ if ws.initReq.retc != nil {
+ ws.initReq.retc <- ws.outc
+ // to prevent next write from taking the slot in buffered channel
+ // and posting duplicate create events
+ ws.initReq.retc = nil
+
+ // send first creation event only if requested
+ if ws.initReq.createdNotify {
+ ws.outc <- *wr
+ }
+ // once the watch channel is returned, a current revision
+ // watch must resume at the store revision. This is necessary
+ // for the following case to work as expected:
+ // wch := m1.Watch("a")
+ // m2.Put("a", "b")
+ // <-wch
+ // If the revision is only bound on the first observed event,
+ // if wch is disconnected before the Put is issued, then reconnects
+ // after it is committed, it'll miss the Put.
+ if ws.initReq.rev == 0 {
+ nextRev = wr.Header.Revision
+ }
+ }
+ } else {
+ // current progress of watch; <= store revision
+ nextRev = wr.Header.Revision
+ }
+
+ if len(wr.Events) > 0 {
+ nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
+ }
+ ws.initReq.rev = nextRev
+
+ // created event is already sent above,
+ // watcher should not post duplicate events
+ if wr.Created {
+ continue
+ }
+
+ // TODO pause channel if buffer gets too large
+ ws.buf = append(ws.buf, wr)
+ case <-w.ctx.Done():
+ return
+ case <-ws.initReq.ctx.Done():
+ return
+ case <-resumec:
+ resuming = true
+ return
+ }
+ }
+ // lazily send cancel message if events on missing id
+}
+
+func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
+ // mark all substreams as resuming
+ close(w.resumec)
+ w.resumec = make(chan struct{})
+ w.joinSubstreams()
+ for _, ws := range w.substreams {
+ ws.id = -1
+ w.resuming = append(w.resuming, ws)
+ }
+ // strip out nils, if any
+ var resuming []*watcherStream
+ for _, ws := range w.resuming {
+ if ws != nil {
+ resuming = append(resuming, ws)
+ }
+ }
+ w.resuming = resuming
+ w.substreams = make(map[int64]*watcherStream)
+
+ // connect to grpc stream while accepting watcher cancelation
+ stopc := make(chan struct{})
+ donec := w.waitCancelSubstreams(stopc)
+ wc, err := w.openWatchClient()
+ close(stopc)
+ <-donec
+
+ // serve all non-closing streams, even if there's a client error
+ // so that the teardown path can shutdown the streams as expected.
+ for _, ws := range w.resuming {
+ if ws.closing {
+ continue
+ }
+ ws.donec = make(chan struct{})
+ w.wg.Add(1)
+ go w.serveSubstream(ws, w.resumec)
+ }
+
+ if err != nil {
+ return nil, v3rpc.Error(err)
+ }
+
+ // receive data from new grpc stream
+ go w.serveWatchClient(wc)
+ return wc, nil
+}
+
+func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
+ var wg sync.WaitGroup
+ wg.Add(len(w.resuming))
+ donec := make(chan struct{})
+ for i := range w.resuming {
+ go func(ws *watcherStream) {
+ defer wg.Done()
+ if ws.closing {
+ if ws.initReq.ctx.Err() != nil && ws.outc != nil {
+ close(ws.outc)
+ ws.outc = nil
+ }
+ return
+ }
+ select {
+ case <-ws.initReq.ctx.Done():
+ // closed ws will be removed from resuming
+ ws.closing = true
+ close(ws.outc)
+ ws.outc = nil
+ w.wg.Add(1)
+ go func() {
+ defer w.wg.Done()
+ w.closingc <- ws
+ }()
+ case <-stopc:
+ }
+ }(w.resuming[i])
+ }
+ go func() {
+ defer close(donec)
+ wg.Wait()
+ }()
+ return donec
+}
+
+// joinSubstreams waits for all substream goroutines to complete.
+func (w *watchGrpcStream) joinSubstreams() {
+ for _, ws := range w.substreams {
+ <-ws.donec
+ }
+ for _, ws := range w.resuming {
+ if ws != nil {
+ <-ws.donec
+ }
+ }
+}
+
+var maxBackoff = 100 * time.Millisecond
+
+// openWatchClient retries opening a watch client until success or halt.
+// manually retry in case "ws==nil && err==nil"
+// TODO: remove FailFast=false
+func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
+ backoff := time.Millisecond
+ for {
+ select {
+ case <-w.ctx.Done():
+ if err == nil {
+ return nil, w.ctx.Err()
+ }
+ return nil, err
+ default:
+ }
+ if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
+ break
+ }
+ if isHaltErr(w.ctx, err) {
+ return nil, v3rpc.Error(err)
+ }
+ if isUnavailableErr(w.ctx, err) {
+ // retry, but backoff
+ if backoff < maxBackoff {
+ // 25% backoff factor
+ backoff = backoff + backoff/4
+ if backoff > maxBackoff {
+ backoff = maxBackoff
+ }
+ }
+ time.Sleep(backoff)
+ }
+ }
+ return ws, nil
+}
+
+// toPB converts an internal watch request structure to its protobuf WatchRequest structure.
+func (wr *watchRequest) toPB() *pb.WatchRequest {
+ req := &pb.WatchCreateRequest{
+ StartRevision: wr.rev,
+ Key: []byte(wr.key),
+ RangeEnd: []byte(wr.end),
+ ProgressNotify: wr.progressNotify,
+ Filters: wr.filters,
+ PrevKv: wr.prevKV,
+ }
+ cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
+ return &pb.WatchRequest{RequestUnion: cr}
+}
+
+func streamKeyFromCtx(ctx context.Context) string {
+ if md, ok := metadata.FromOutgoingContext(ctx); ok {
+ return fmt.Sprintf("%+v", md)
+ }
+ return ""
+}
diff --git a/vendor/go.etcd.io/etcd/cmd/etcd b/vendor/go.etcd.io/etcd/cmd/etcd
new file mode 120000
index 0000000..b870225
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/cmd/etcd
@@ -0,0 +1 @@
+../
\ No newline at end of file
diff --git a/vendor/go.etcd.io/etcd/cmd/etcdctl b/vendor/go.etcd.io/etcd/cmd/etcdctl
new file mode 120000
index 0000000..05bb269
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/cmd/etcdctl
@@ -0,0 +1 @@
+../etcdctl
\ No newline at end of file
diff --git a/vendor/go.etcd.io/etcd/cmd/functional b/vendor/go.etcd.io/etcd/cmd/functional
new file mode 120000
index 0000000..44faa31
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/cmd/functional
@@ -0,0 +1 @@
+../functional
\ No newline at end of file
diff --git a/vendor/go.etcd.io/etcd/cmd/tools b/vendor/go.etcd.io/etcd/cmd/tools
new file mode 120000
index 0000000..4887d6e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/cmd/tools
@@ -0,0 +1 @@
+../tools
\ No newline at end of file
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/doc.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/doc.go
new file mode 100644
index 0000000..f72c6a6
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package rpctypes has types and values shared by the etcd server and client for v3 RPC interaction.
+package rpctypes
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go
new file mode 100644
index 0000000..55eab38
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/error.go
@@ -0,0 +1,215 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rpctypes
+
+import (
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// server-side error
+var (
+ ErrGRPCEmptyKey = status.New(codes.InvalidArgument, "etcdserver: key is not provided").Err()
+ ErrGRPCKeyNotFound = status.New(codes.InvalidArgument, "etcdserver: key not found").Err()
+ ErrGRPCValueProvided = status.New(codes.InvalidArgument, "etcdserver: value is provided").Err()
+ ErrGRPCLeaseProvided = status.New(codes.InvalidArgument, "etcdserver: lease is provided").Err()
+ ErrGRPCTooManyOps = status.New(codes.InvalidArgument, "etcdserver: too many operations in txn request").Err()
+ ErrGRPCDuplicateKey = status.New(codes.InvalidArgument, "etcdserver: duplicate key given in txn request").Err()
+ ErrGRPCCompacted = status.New(codes.OutOfRange, "etcdserver: mvcc: required revision has been compacted").Err()
+ ErrGRPCFutureRev = status.New(codes.OutOfRange, "etcdserver: mvcc: required revision is a future revision").Err()
+ ErrGRPCNoSpace = status.New(codes.ResourceExhausted, "etcdserver: mvcc: database space exceeded").Err()
+
+ ErrGRPCLeaseNotFound = status.New(codes.NotFound, "etcdserver: requested lease not found").Err()
+ ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
+ ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()
+
+ ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
+ ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
+ ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
+ ErrGRPCMemberBadURLs = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err()
+ ErrGRPCMemberNotFound = status.New(codes.NotFound, "etcdserver: member not found").Err()
+
+ ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err()
+ ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err()
+
+ ErrGRPCRootUserNotExist = status.New(codes.FailedPrecondition, "etcdserver: root user does not exist").Err()
+ ErrGRPCRootRoleNotExist = status.New(codes.FailedPrecondition, "etcdserver: root user does not have root role").Err()
+ ErrGRPCUserAlreadyExist = status.New(codes.FailedPrecondition, "etcdserver: user name already exists").Err()
+ ErrGRPCUserEmpty = status.New(codes.InvalidArgument, "etcdserver: user name is empty").Err()
+ ErrGRPCUserNotFound = status.New(codes.FailedPrecondition, "etcdserver: user name not found").Err()
+ ErrGRPCRoleAlreadyExist = status.New(codes.FailedPrecondition, "etcdserver: role name already exists").Err()
+ ErrGRPCRoleNotFound = status.New(codes.FailedPrecondition, "etcdserver: role name not found").Err()
+ ErrGRPCAuthFailed = status.New(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password").Err()
+ ErrGRPCPermissionDenied = status.New(codes.PermissionDenied, "etcdserver: permission denied").Err()
+ ErrGRPCRoleNotGranted = status.New(codes.FailedPrecondition, "etcdserver: role is not granted to the user").Err()
+ ErrGRPCPermissionNotGranted = status.New(codes.FailedPrecondition, "etcdserver: permission is not granted to the role").Err()
+ ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err()
+ ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err()
+ ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err()
+
+ ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
+ ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
+ ErrGRPCNotCapable = status.New(codes.Unavailable, "etcdserver: not capable").Err()
+ ErrGRPCStopped = status.New(codes.Unavailable, "etcdserver: server stopped").Err()
+ ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
+ ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err()
+ ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
+ ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
+ ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
+
+ errStringToError = map[string]error{
+ ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
+ ErrorDesc(ErrGRPCKeyNotFound): ErrGRPCKeyNotFound,
+ ErrorDesc(ErrGRPCValueProvided): ErrGRPCValueProvided,
+ ErrorDesc(ErrGRPCLeaseProvided): ErrGRPCLeaseProvided,
+
+ ErrorDesc(ErrGRPCTooManyOps): ErrGRPCTooManyOps,
+ ErrorDesc(ErrGRPCDuplicateKey): ErrGRPCDuplicateKey,
+ ErrorDesc(ErrGRPCCompacted): ErrGRPCCompacted,
+ ErrorDesc(ErrGRPCFutureRev): ErrGRPCFutureRev,
+ ErrorDesc(ErrGRPCNoSpace): ErrGRPCNoSpace,
+
+ ErrorDesc(ErrGRPCLeaseNotFound): ErrGRPCLeaseNotFound,
+ ErrorDesc(ErrGRPCLeaseExist): ErrGRPCLeaseExist,
+ ErrorDesc(ErrGRPCLeaseTTLTooLarge): ErrGRPCLeaseTTLTooLarge,
+
+ ErrorDesc(ErrGRPCMemberExist): ErrGRPCMemberExist,
+ ErrorDesc(ErrGRPCPeerURLExist): ErrGRPCPeerURLExist,
+ ErrorDesc(ErrGRPCMemberNotEnoughStarted): ErrGRPCMemberNotEnoughStarted,
+ ErrorDesc(ErrGRPCMemberBadURLs): ErrGRPCMemberBadURLs,
+ ErrorDesc(ErrGRPCMemberNotFound): ErrGRPCMemberNotFound,
+
+ ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge,
+ ErrorDesc(ErrGRPCRequestTooManyRequests): ErrGRPCRequestTooManyRequests,
+
+ ErrorDesc(ErrGRPCRootUserNotExist): ErrGRPCRootUserNotExist,
+ ErrorDesc(ErrGRPCRootRoleNotExist): ErrGRPCRootRoleNotExist,
+ ErrorDesc(ErrGRPCUserAlreadyExist): ErrGRPCUserAlreadyExist,
+ ErrorDesc(ErrGRPCUserEmpty): ErrGRPCUserEmpty,
+ ErrorDesc(ErrGRPCUserNotFound): ErrGRPCUserNotFound,
+ ErrorDesc(ErrGRPCRoleAlreadyExist): ErrGRPCRoleAlreadyExist,
+ ErrorDesc(ErrGRPCRoleNotFound): ErrGRPCRoleNotFound,
+ ErrorDesc(ErrGRPCAuthFailed): ErrGRPCAuthFailed,
+ ErrorDesc(ErrGRPCPermissionDenied): ErrGRPCPermissionDenied,
+ ErrorDesc(ErrGRPCRoleNotGranted): ErrGRPCRoleNotGranted,
+ ErrorDesc(ErrGRPCPermissionNotGranted): ErrGRPCPermissionNotGranted,
+ ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,
+ ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken,
+ ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt,
+
+ ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
+ ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
+ ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
+ ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
+ ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
+ ErrorDesc(ErrGRPCTimeoutDueToLeaderFail): ErrGRPCTimeoutDueToLeaderFail,
+ ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost,
+ ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy,
+ ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt,
+ }
+)
+
+// client-side error
+var (
+ ErrEmptyKey = Error(ErrGRPCEmptyKey)
+ ErrKeyNotFound = Error(ErrGRPCKeyNotFound)
+ ErrValueProvided = Error(ErrGRPCValueProvided)
+ ErrLeaseProvided = Error(ErrGRPCLeaseProvided)
+ ErrTooManyOps = Error(ErrGRPCTooManyOps)
+ ErrDuplicateKey = Error(ErrGRPCDuplicateKey)
+ ErrCompacted = Error(ErrGRPCCompacted)
+ ErrFutureRev = Error(ErrGRPCFutureRev)
+ ErrNoSpace = Error(ErrGRPCNoSpace)
+
+ ErrLeaseNotFound = Error(ErrGRPCLeaseNotFound)
+ ErrLeaseExist = Error(ErrGRPCLeaseExist)
+ ErrLeaseTTLTooLarge = Error(ErrGRPCLeaseTTLTooLarge)
+
+ ErrMemberExist = Error(ErrGRPCMemberExist)
+ ErrPeerURLExist = Error(ErrGRPCPeerURLExist)
+ ErrMemberNotEnoughStarted = Error(ErrGRPCMemberNotEnoughStarted)
+ ErrMemberBadURLs = Error(ErrGRPCMemberBadURLs)
+ ErrMemberNotFound = Error(ErrGRPCMemberNotFound)
+
+ ErrRequestTooLarge = Error(ErrGRPCRequestTooLarge)
+ ErrTooManyRequests = Error(ErrGRPCRequestTooManyRequests)
+
+ ErrRootUserNotExist = Error(ErrGRPCRootUserNotExist)
+ ErrRootRoleNotExist = Error(ErrGRPCRootRoleNotExist)
+ ErrUserAlreadyExist = Error(ErrGRPCUserAlreadyExist)
+ ErrUserEmpty = Error(ErrGRPCUserEmpty)
+ ErrUserNotFound = Error(ErrGRPCUserNotFound)
+ ErrRoleAlreadyExist = Error(ErrGRPCRoleAlreadyExist)
+ ErrRoleNotFound = Error(ErrGRPCRoleNotFound)
+ ErrAuthFailed = Error(ErrGRPCAuthFailed)
+ ErrPermissionDenied = Error(ErrGRPCPermissionDenied)
+ ErrRoleNotGranted = Error(ErrGRPCRoleNotGranted)
+ ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
+ ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)
+ ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken)
+ ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt)
+
+ ErrNoLeader = Error(ErrGRPCNoLeader)
+ ErrNotLeader = Error(ErrGRPCNotLeader)
+ ErrNotCapable = Error(ErrGRPCNotCapable)
+ ErrStopped = Error(ErrGRPCStopped)
+ ErrTimeout = Error(ErrGRPCTimeout)
+ ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
+ ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
+ ErrUnhealthy = Error(ErrGRPCUnhealthy)
+ ErrCorrupt = Error(ErrGRPCCorrupt)
+)
+
+// EtcdError defines gRPC server errors.
+// (https://github.com/grpc/grpc-go/blob/master/rpc_util.go#L319-L323)
+type EtcdError struct {
+ code codes.Code
+ desc string
+}
+
+// Code returns grpc/codes.Code.
+// TODO: define clientv3/codes.Code.
+func (e EtcdError) Code() codes.Code {
+ return e.code
+}
+
+func (e EtcdError) Error() string {
+ return e.desc
+}
+
+func Error(err error) error {
+ if err == nil {
+ return nil
+ }
+ verr, ok := errStringToError[ErrorDesc(err)]
+ if !ok { // not gRPC error
+ return err
+ }
+ ev, ok := status.FromError(verr)
+ var desc string
+ if ok {
+ desc = ev.Message()
+ } else {
+ desc = verr.Error()
+ }
+ return EtcdError{code: ev.Code(), desc: desc}
+}
+
+func ErrorDesc(err error) string {
+ if s, ok := status.FromError(err); ok {
+ return s.Message()
+ }
+ return err.Error()
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go
new file mode 100644
index 0000000..5c590e1
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go
@@ -0,0 +1,20 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rpctypes
+
+var (
+ MetadataRequireLeaderKey = "hasleader"
+ MetadataHasLeader = "true"
+)