Dependencies for the affinity router and the
affinity routing daemon.
Change-Id: Icda72c3594ef7f8f0bc0c33dc03087a4c25529ca
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/auth_client_adapter.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/auth_client_adapter.go
new file mode 100644
index 0000000..33dc91f
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/auth_client_adapter.go
@@ -0,0 +1,93 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapter
+
+import (
+ "context"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ grpc "google.golang.org/grpc"
+)
+
+type as2ac struct{ as pb.AuthServer }
+
+func AuthServerToAuthClient(as pb.AuthServer) pb.AuthClient {
+ return &as2ac{as}
+}
+
+func (s *as2ac) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (*pb.AuthEnableResponse, error) {
+ return s.as.AuthEnable(ctx, in)
+}
+
+func (s *as2ac) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (*pb.AuthDisableResponse, error) {
+ return s.as.AuthDisable(ctx, in)
+}
+
+func (s *as2ac) Authenticate(ctx context.Context, in *pb.AuthenticateRequest, opts ...grpc.CallOption) (*pb.AuthenticateResponse, error) {
+ return s.as.Authenticate(ctx, in)
+}
+
+func (s *as2ac) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (*pb.AuthRoleAddResponse, error) {
+ return s.as.RoleAdd(ctx, in)
+}
+
+func (s *as2ac) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (*pb.AuthRoleDeleteResponse, error) {
+ return s.as.RoleDelete(ctx, in)
+}
+
+func (s *as2ac) RoleGet(ctx context.Context, in *pb.AuthRoleGetRequest, opts ...grpc.CallOption) (*pb.AuthRoleGetResponse, error) {
+ return s.as.RoleGet(ctx, in)
+}
+
+func (s *as2ac) RoleList(ctx context.Context, in *pb.AuthRoleListRequest, opts ...grpc.CallOption) (*pb.AuthRoleListResponse, error) {
+ return s.as.RoleList(ctx, in)
+}
+
+func (s *as2ac) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (*pb.AuthRoleRevokePermissionResponse, error) {
+ return s.as.RoleRevokePermission(ctx, in)
+}
+
+func (s *as2ac) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (*pb.AuthRoleGrantPermissionResponse, error) {
+ return s.as.RoleGrantPermission(ctx, in)
+}
+
+func (s *as2ac) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (*pb.AuthUserDeleteResponse, error) {
+ return s.as.UserDelete(ctx, in)
+}
+
+func (s *as2ac) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (*pb.AuthUserAddResponse, error) {
+ return s.as.UserAdd(ctx, in)
+}
+
+func (s *as2ac) UserGet(ctx context.Context, in *pb.AuthUserGetRequest, opts ...grpc.CallOption) (*pb.AuthUserGetResponse, error) {
+ return s.as.UserGet(ctx, in)
+}
+
+func (s *as2ac) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (*pb.AuthUserListResponse, error) {
+ return s.as.UserList(ctx, in)
+}
+
+func (s *as2ac) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (*pb.AuthUserGrantRoleResponse, error) {
+ return s.as.UserGrantRole(ctx, in)
+}
+
+func (s *as2ac) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (*pb.AuthUserRevokeRoleResponse, error) {
+ return s.as.UserRevokeRole(ctx, in)
+}
+
+func (s *as2ac) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (*pb.AuthUserChangePasswordResponse, error) {
+ return s.as.UserChangePassword(ctx, in)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/chan_stream.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/chan_stream.go
new file mode 100644
index 0000000..82e3411
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/chan_stream.go
@@ -0,0 +1,165 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapter
+
+import (
+ "context"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+// chanServerStream implements grpc.ServerStream with a chanStream
+type chanServerStream struct {
+ headerc chan<- metadata.MD
+ trailerc chan<- metadata.MD
+ grpc.Stream
+
+ headers []metadata.MD
+}
+
+func (ss *chanServerStream) SendHeader(md metadata.MD) error {
+ if ss.headerc == nil {
+ return errAlreadySentHeader
+ }
+ outmd := make(map[string][]string)
+ for _, h := range append(ss.headers, md) {
+ for k, v := range h {
+ outmd[k] = v
+ }
+ }
+ select {
+ case ss.headerc <- outmd:
+ ss.headerc = nil
+ ss.headers = nil
+ return nil
+ case <-ss.Context().Done():
+ }
+ return ss.Context().Err()
+}
+
+func (ss *chanServerStream) SetHeader(md metadata.MD) error {
+ if ss.headerc == nil {
+ return errAlreadySentHeader
+ }
+ ss.headers = append(ss.headers, md)
+ return nil
+}
+
+func (ss *chanServerStream) SetTrailer(md metadata.MD) {
+ ss.trailerc <- md
+}
+
+// chanClientStream implements grpc.ClientStream with a chanStream
+type chanClientStream struct {
+ headerc <-chan metadata.MD
+ trailerc <-chan metadata.MD
+ *chanStream
+}
+
+func (cs *chanClientStream) Header() (metadata.MD, error) {
+ select {
+ case md := <-cs.headerc:
+ return md, nil
+ case <-cs.Context().Done():
+ }
+ return nil, cs.Context().Err()
+}
+
+func (cs *chanClientStream) Trailer() metadata.MD {
+ select {
+ case md := <-cs.trailerc:
+ return md
+ case <-cs.Context().Done():
+ return nil
+ }
+}
+
+func (cs *chanClientStream) CloseSend() error {
+ close(cs.chanStream.sendc)
+ return nil
+}
+
+// chanStream implements grpc.Stream using channels
+type chanStream struct {
+ recvc <-chan interface{}
+ sendc chan<- interface{}
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+func (s *chanStream) Context() context.Context { return s.ctx }
+
+func (s *chanStream) SendMsg(m interface{}) error {
+ select {
+ case s.sendc <- m:
+ if err, ok := m.(error); ok {
+ return err
+ }
+ return nil
+ case <-s.ctx.Done():
+ }
+ return s.ctx.Err()
+}
+
+func (s *chanStream) RecvMsg(m interface{}) error {
+ v := m.(*interface{})
+ for {
+ select {
+ case msg, ok := <-s.recvc:
+ if !ok {
+ return grpc.ErrClientConnClosing
+ }
+ if err, ok := msg.(error); ok {
+ return err
+ }
+ *v = msg
+ return nil
+ case <-s.ctx.Done():
+ }
+ if len(s.recvc) == 0 {
+ // prioritize any pending recv messages over canceled context
+ break
+ }
+ }
+ return s.ctx.Err()
+}
+
+func newPipeStream(ctx context.Context, ssHandler func(chanServerStream) error) chanClientStream {
+ // ch1 is buffered so server can send error on close
+ ch1, ch2 := make(chan interface{}, 1), make(chan interface{})
+ headerc, trailerc := make(chan metadata.MD, 1), make(chan metadata.MD, 1)
+
+ cctx, ccancel := context.WithCancel(ctx)
+ cli := &chanStream{recvc: ch1, sendc: ch2, ctx: cctx, cancel: ccancel}
+ cs := chanClientStream{headerc, trailerc, cli}
+
+ sctx, scancel := context.WithCancel(ctx)
+ srv := &chanStream{recvc: ch2, sendc: ch1, ctx: sctx, cancel: scancel}
+ ss := chanServerStream{headerc, trailerc, srv, nil}
+
+ go func() {
+ if err := ssHandler(ss); err != nil {
+ select {
+ case srv.sendc <- err:
+ case <-sctx.Done():
+ case <-cctx.Done():
+ }
+ }
+ scancel()
+ ccancel()
+ }()
+ return cs
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/cluster_client_adapter.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/cluster_client_adapter.go
new file mode 100644
index 0000000..6c03409
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/cluster_client_adapter.go
@@ -0,0 +1,45 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapter
+
+import (
+ "context"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+type cls2clc struct{ cls pb.ClusterServer }
+
+func ClusterServerToClusterClient(cls pb.ClusterServer) pb.ClusterClient {
+ return &cls2clc{cls}
+}
+
+func (s *cls2clc) MemberList(ctx context.Context, r *pb.MemberListRequest, opts ...grpc.CallOption) (*pb.MemberListResponse, error) {
+ return s.cls.MemberList(ctx, r)
+}
+
+func (s *cls2clc) MemberAdd(ctx context.Context, r *pb.MemberAddRequest, opts ...grpc.CallOption) (*pb.MemberAddResponse, error) {
+ return s.cls.MemberAdd(ctx, r)
+}
+
+func (s *cls2clc) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest, opts ...grpc.CallOption) (*pb.MemberUpdateResponse, error) {
+ return s.cls.MemberUpdate(ctx, r)
+}
+
+func (s *cls2clc) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest, opts ...grpc.CallOption) (*pb.MemberRemoveResponse, error) {
+ return s.cls.MemberRemove(ctx, r)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/doc.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/doc.go
new file mode 100644
index 0000000..7170be2
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/doc.go
@@ -0,0 +1,17 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package adapter provides gRPC adapters between client and server
+// gRPC interfaces without needing to go through a gRPC connection.
+package adapter
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/election_client_adapter.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/election_client_adapter.go
new file mode 100644
index 0000000..a2ebf13
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/election_client_adapter.go
@@ -0,0 +1,80 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapter
+
+import (
+ "context"
+
+ "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
+
+ "google.golang.org/grpc"
+)
+
+type es2ec struct{ es v3electionpb.ElectionServer }
+
+func ElectionServerToElectionClient(es v3electionpb.ElectionServer) v3electionpb.ElectionClient {
+ return &es2ec{es}
+}
+
+func (s *es2ec) Campaign(ctx context.Context, r *v3electionpb.CampaignRequest, opts ...grpc.CallOption) (*v3electionpb.CampaignResponse, error) {
+ return s.es.Campaign(ctx, r)
+}
+
+func (s *es2ec) Proclaim(ctx context.Context, r *v3electionpb.ProclaimRequest, opts ...grpc.CallOption) (*v3electionpb.ProclaimResponse, error) {
+ return s.es.Proclaim(ctx, r)
+}
+
+func (s *es2ec) Leader(ctx context.Context, r *v3electionpb.LeaderRequest, opts ...grpc.CallOption) (*v3electionpb.LeaderResponse, error) {
+ return s.es.Leader(ctx, r)
+}
+
+func (s *es2ec) Resign(ctx context.Context, r *v3electionpb.ResignRequest, opts ...grpc.CallOption) (*v3electionpb.ResignResponse, error) {
+ return s.es.Resign(ctx, r)
+}
+
+func (s *es2ec) Observe(ctx context.Context, in *v3electionpb.LeaderRequest, opts ...grpc.CallOption) (v3electionpb.Election_ObserveClient, error) {
+ cs := newPipeStream(ctx, func(ss chanServerStream) error {
+ return s.es.Observe(in, &es2ecServerStream{ss})
+ })
+ return &es2ecClientStream{cs}, nil
+}
+
+// es2ecClientStream implements Election_ObserveClient
+type es2ecClientStream struct{ chanClientStream }
+
+// es2ecServerStream implements Election_ObserveServer
+type es2ecServerStream struct{ chanServerStream }
+
+func (s *es2ecClientStream) Send(rr *v3electionpb.LeaderRequest) error {
+ return s.SendMsg(rr)
+}
+func (s *es2ecClientStream) Recv() (*v3electionpb.LeaderResponse, error) {
+ var v interface{}
+ if err := s.RecvMsg(&v); err != nil {
+ return nil, err
+ }
+ return v.(*v3electionpb.LeaderResponse), nil
+}
+
+func (s *es2ecServerStream) Send(rr *v3electionpb.LeaderResponse) error {
+ return s.SendMsg(rr)
+}
+func (s *es2ecServerStream) Recv() (*v3electionpb.LeaderRequest, error) {
+ var v interface{}
+ if err := s.RecvMsg(&v); err != nil {
+ return nil, err
+ }
+ return v.(*v3electionpb.LeaderRequest), nil
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/kv_client_adapter.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/kv_client_adapter.go
new file mode 100644
index 0000000..acd5673
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/kv_client_adapter.go
@@ -0,0 +1,49 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapter
+
+import (
+ "context"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ grpc "google.golang.org/grpc"
+)
+
+type kvs2kvc struct{ kvs pb.KVServer }
+
+func KvServerToKvClient(kvs pb.KVServer) pb.KVClient {
+ return &kvs2kvc{kvs}
+}
+
+func (s *kvs2kvc) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (*pb.RangeResponse, error) {
+ return s.kvs.Range(ctx, in)
+}
+
+func (s *kvs2kvc) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (*pb.PutResponse, error) {
+ return s.kvs.Put(ctx, in)
+}
+
+func (s *kvs2kvc) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (*pb.DeleteRangeResponse, error) {
+ return s.kvs.DeleteRange(ctx, in)
+}
+
+func (s *kvs2kvc) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (*pb.TxnResponse, error) {
+ return s.kvs.Txn(ctx, in)
+}
+
+func (s *kvs2kvc) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (*pb.CompactionResponse, error) {
+ return s.kvs.Compact(ctx, in)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/lease_client_adapter.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/lease_client_adapter.go
new file mode 100644
index 0000000..84c48b5
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/lease_client_adapter.go
@@ -0,0 +1,82 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapter
+
+import (
+ "context"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+type ls2lc struct {
+ leaseServer pb.LeaseServer
+}
+
+func LeaseServerToLeaseClient(ls pb.LeaseServer) pb.LeaseClient {
+ return &ls2lc{ls}
+}
+
+func (c *ls2lc) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (*pb.LeaseGrantResponse, error) {
+ return c.leaseServer.LeaseGrant(ctx, in)
+}
+
+func (c *ls2lc) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (*pb.LeaseRevokeResponse, error) {
+ return c.leaseServer.LeaseRevoke(ctx, in)
+}
+
+func (c *ls2lc) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (pb.Lease_LeaseKeepAliveClient, error) {
+ cs := newPipeStream(ctx, func(ss chanServerStream) error {
+ return c.leaseServer.LeaseKeepAlive(&ls2lcServerStream{ss})
+ })
+ return &ls2lcClientStream{cs}, nil
+}
+
+func (c *ls2lc) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (*pb.LeaseTimeToLiveResponse, error) {
+ return c.leaseServer.LeaseTimeToLive(ctx, in)
+}
+
+func (c *ls2lc) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (*pb.LeaseLeasesResponse, error) {
+ return c.leaseServer.LeaseLeases(ctx, in)
+}
+
+// ls2lcClientStream implements Lease_LeaseKeepAliveClient
+type ls2lcClientStream struct{ chanClientStream }
+
+// ls2lcServerStream implements Lease_LeaseKeepAliveServer
+type ls2lcServerStream struct{ chanServerStream }
+
+func (s *ls2lcClientStream) Send(rr *pb.LeaseKeepAliveRequest) error {
+ return s.SendMsg(rr)
+}
+func (s *ls2lcClientStream) Recv() (*pb.LeaseKeepAliveResponse, error) {
+ var v interface{}
+ if err := s.RecvMsg(&v); err != nil {
+ return nil, err
+ }
+ return v.(*pb.LeaseKeepAliveResponse), nil
+}
+
+func (s *ls2lcServerStream) Send(rr *pb.LeaseKeepAliveResponse) error {
+ return s.SendMsg(rr)
+}
+func (s *ls2lcServerStream) Recv() (*pb.LeaseKeepAliveRequest, error) {
+ var v interface{}
+ if err := s.RecvMsg(&v); err != nil {
+ return nil, err
+ }
+ return v.(*pb.LeaseKeepAliveRequest), nil
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/lock_client_adapter.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/lock_client_adapter.go
new file mode 100644
index 0000000..9ce7913
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/lock_client_adapter.go
@@ -0,0 +1,37 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapter
+
+import (
+ "context"
+
+ "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
+
+ "google.golang.org/grpc"
+)
+
+type ls2lsc struct{ ls v3lockpb.LockServer }
+
+func LockServerToLockClient(ls v3lockpb.LockServer) v3lockpb.LockClient {
+ return &ls2lsc{ls}
+}
+
+func (s *ls2lsc) Lock(ctx context.Context, r *v3lockpb.LockRequest, opts ...grpc.CallOption) (*v3lockpb.LockResponse, error) {
+ return s.ls.Lock(ctx, r)
+}
+
+func (s *ls2lsc) Unlock(ctx context.Context, r *v3lockpb.UnlockRequest, opts ...grpc.CallOption) (*v3lockpb.UnlockResponse, error) {
+ return s.ls.Unlock(ctx, r)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/maintenance_client_adapter.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/maintenance_client_adapter.go
new file mode 100644
index 0000000..92d9dfd
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/maintenance_client_adapter.go
@@ -0,0 +1,88 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapter
+
+import (
+ "context"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+)
+
+type mts2mtc struct{ mts pb.MaintenanceServer }
+
+func MaintenanceServerToMaintenanceClient(mts pb.MaintenanceServer) pb.MaintenanceClient {
+ return &mts2mtc{mts}
+}
+
+func (s *mts2mtc) Alarm(ctx context.Context, r *pb.AlarmRequest, opts ...grpc.CallOption) (*pb.AlarmResponse, error) {
+ return s.mts.Alarm(ctx, r)
+}
+
+func (s *mts2mtc) Status(ctx context.Context, r *pb.StatusRequest, opts ...grpc.CallOption) (*pb.StatusResponse, error) {
+ return s.mts.Status(ctx, r)
+}
+
+func (s *mts2mtc) Defragment(ctx context.Context, dr *pb.DefragmentRequest, opts ...grpc.CallOption) (*pb.DefragmentResponse, error) {
+ return s.mts.Defragment(ctx, dr)
+}
+
+func (s *mts2mtc) Hash(ctx context.Context, r *pb.HashRequest, opts ...grpc.CallOption) (*pb.HashResponse, error) {
+ return s.mts.Hash(ctx, r)
+}
+
+func (s *mts2mtc) HashKV(ctx context.Context, r *pb.HashKVRequest, opts ...grpc.CallOption) (*pb.HashKVResponse, error) {
+ return s.mts.HashKV(ctx, r)
+}
+
+func (s *mts2mtc) MoveLeader(ctx context.Context, r *pb.MoveLeaderRequest, opts ...grpc.CallOption) (*pb.MoveLeaderResponse, error) {
+ return s.mts.MoveLeader(ctx, r)
+}
+
+func (s *mts2mtc) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (pb.Maintenance_SnapshotClient, error) {
+ cs := newPipeStream(ctx, func(ss chanServerStream) error {
+ return s.mts.Snapshot(in, &ss2scServerStream{ss})
+ })
+ return &ss2scClientStream{cs}, nil
+}
+
+// ss2scClientStream implements Maintenance_SnapshotClient
+type ss2scClientStream struct{ chanClientStream }
+
+// ss2scServerStream implements Maintenance_SnapshotServer
+type ss2scServerStream struct{ chanServerStream }
+
+func (s *ss2scClientStream) Send(rr *pb.SnapshotRequest) error {
+ return s.SendMsg(rr)
+}
+func (s *ss2scClientStream) Recv() (*pb.SnapshotResponse, error) {
+ var v interface{}
+ if err := s.RecvMsg(&v); err != nil {
+ return nil, err
+ }
+ return v.(*pb.SnapshotResponse), nil
+}
+
+func (s *ss2scServerStream) Send(rr *pb.SnapshotResponse) error {
+ return s.SendMsg(rr)
+}
+func (s *ss2scServerStream) Recv() (*pb.SnapshotRequest, error) {
+ var v interface{}
+ if err := s.RecvMsg(&v); err != nil {
+ return nil, err
+ }
+ return v.(*pb.SnapshotRequest), nil
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/watch_client_adapter.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/watch_client_adapter.go
new file mode 100644
index 0000000..afe61e8
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/adapter/watch_client_adapter.go
@@ -0,0 +1,66 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapter
+
+import (
+ "context"
+ "errors"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "google.golang.org/grpc"
+)
+
+var errAlreadySentHeader = errors.New("adapter: already sent header")
+
+type ws2wc struct{ wserv pb.WatchServer }
+
+func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient {
+ return &ws2wc{wserv}
+}
+
+func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) {
+ cs := newPipeStream(ctx, func(ss chanServerStream) error {
+ return s.wserv.Watch(&ws2wcServerStream{ss})
+ })
+ return &ws2wcClientStream{cs}, nil
+}
+
+// ws2wcClientStream implements Watch_WatchClient
+type ws2wcClientStream struct{ chanClientStream }
+
+// ws2wcServerStream implements Watch_WatchServer
+type ws2wcServerStream struct{ chanServerStream }
+
+func (s *ws2wcClientStream) Send(wr *pb.WatchRequest) error {
+ return s.SendMsg(wr)
+}
+func (s *ws2wcClientStream) Recv() (*pb.WatchResponse, error) {
+ var v interface{}
+ if err := s.RecvMsg(&v); err != nil {
+ return nil, err
+ }
+ return v.(*pb.WatchResponse), nil
+}
+
+func (s *ws2wcServerStream) Send(wr *pb.WatchResponse) error {
+ return s.SendMsg(wr)
+}
+func (s *ws2wcServerStream) Recv() (*pb.WatchRequest, error) {
+ var v interface{}
+ if err := s.RecvMsg(&v); err != nil {
+ return nil, err
+ }
+ return v.(*pb.WatchRequest), nil
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/auth.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/auth.go
new file mode 100644
index 0000000..0ed8d24
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/auth.go
@@ -0,0 +1,110 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+
+ "github.com/coreos/etcd/clientv3"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type AuthProxy struct {
+ client *clientv3.Client
+}
+
+func NewAuthProxy(c *clientv3.Client) pb.AuthServer {
+ return &AuthProxy{client: c}
+}
+
+func (ap *AuthProxy) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).AuthEnable(ctx, r)
+}
+
+func (ap *AuthProxy) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).AuthDisable(ctx, r)
+}
+
+func (ap *AuthProxy) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).Authenticate(ctx, r)
+}
+
+func (ap *AuthProxy) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).RoleAdd(ctx, r)
+}
+
+func (ap *AuthProxy) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).RoleDelete(ctx, r)
+}
+
+func (ap *AuthProxy) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).RoleGet(ctx, r)
+}
+
+func (ap *AuthProxy) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).RoleList(ctx, r)
+}
+
+func (ap *AuthProxy) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).RoleRevokePermission(ctx, r)
+}
+
+func (ap *AuthProxy) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).RoleGrantPermission(ctx, r)
+}
+
+func (ap *AuthProxy) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).UserAdd(ctx, r)
+}
+
+func (ap *AuthProxy) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).UserDelete(ctx, r)
+}
+
+func (ap *AuthProxy) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).UserGet(ctx, r)
+}
+
+func (ap *AuthProxy) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).UserList(ctx, r)
+}
+
+func (ap *AuthProxy) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).UserGrantRole(ctx, r)
+}
+
+func (ap *AuthProxy) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).UserRevokeRole(ctx, r)
+}
+
+func (ap *AuthProxy) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
+ conn := ap.client.ActiveConnection()
+ return pb.NewAuthClient(conn).UserChangePassword(ctx, r)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/cache/store.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/cache/store.go
new file mode 100644
index 0000000..5765228
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/cache/store.go
@@ -0,0 +1,171 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package cache exports functionality for efficiently caching and mapping
+// `RangeRequest`s to corresponding `RangeResponse`s.
+package cache
+
+import (
+ "errors"
+ "sync"
+
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "github.com/coreos/etcd/pkg/adt"
+ "github.com/golang/groupcache/lru"
+)
+
+var (
+ DefaultMaxEntries = 2048
+ ErrCompacted = rpctypes.ErrGRPCCompacted
+)
+
+type Cache interface {
+ Add(req *pb.RangeRequest, resp *pb.RangeResponse)
+ Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
+ Compact(revision int64)
+ Invalidate(key []byte, endkey []byte)
+ Size() int
+ Close()
+}
+
+// keyFunc returns the key of a request, which is used to look up its caching response in the cache.
+func keyFunc(req *pb.RangeRequest) string {
+ // TODO: use marshalTo to reduce allocation
+ b, err := req.Marshal()
+ if err != nil {
+ panic(err)
+ }
+ return string(b)
+}
+
+func NewCache(maxCacheEntries int) Cache {
+ return &cache{
+ lru: lru.New(maxCacheEntries),
+ compactedRev: -1,
+ }
+}
+
+func (c *cache) Close() {}
+
+// cache implements Cache
+type cache struct {
+ mu sync.RWMutex
+ lru *lru.Cache
+
+ // a reverse index for cache invalidation
+ cachedRanges adt.IntervalTree
+
+ compactedRev int64
+}
+
+// Add adds the response of a request to the cache if its revision is larger than the compacted revision of the cache.
+func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
+ key := keyFunc(req)
+
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if req.Revision > c.compactedRev {
+ c.lru.Add(key, resp)
+ }
+ // we do not need to invalidate a request with a revision specified.
+ // so we do not need to add it into the reverse index.
+ if req.Revision != 0 {
+ return
+ }
+
+ var (
+ iv *adt.IntervalValue
+ ivl adt.Interval
+ )
+ if len(req.RangeEnd) != 0 {
+ ivl = adt.NewStringAffineInterval(string(req.Key), string(req.RangeEnd))
+ } else {
+ ivl = adt.NewStringAffinePoint(string(req.Key))
+ }
+
+ iv = c.cachedRanges.Find(ivl)
+
+ if iv == nil {
+ val := map[string]struct{}{key: {}}
+ c.cachedRanges.Insert(ivl, val)
+ } else {
+ val := iv.Val.(map[string]struct{})
+ val[key] = struct{}{}
+ iv.Val = val
+ }
+}
+
+// Get looks up the caching response for a given request.
+// Get is also responsible for lazy eviction when accessing compacted entries.
+func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) {
+ key := keyFunc(req)
+
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if req.Revision > 0 && req.Revision < c.compactedRev {
+ c.lru.Remove(key)
+ return nil, ErrCompacted
+ }
+
+ if resp, ok := c.lru.Get(key); ok {
+ return resp.(*pb.RangeResponse), nil
+ }
+ return nil, errors.New("not exist")
+}
+
+// Invalidate invalidates the cache entries that intersecting with the given range from key to endkey.
+func (c *cache) Invalidate(key, endkey []byte) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ var (
+ ivs []*adt.IntervalValue
+ ivl adt.Interval
+ )
+ if len(endkey) == 0 {
+ ivl = adt.NewStringAffinePoint(string(key))
+ } else {
+ ivl = adt.NewStringAffineInterval(string(key), string(endkey))
+ }
+
+ ivs = c.cachedRanges.Stab(ivl)
+ for _, iv := range ivs {
+ keys := iv.Val.(map[string]struct{})
+ for key := range keys {
+ c.lru.Remove(key)
+ }
+ }
+ // delete after removing all keys since it is destructive to 'ivs'
+ c.cachedRanges.Delete(ivl)
+}
+
+// Compact invalidate all caching response before the given rev.
+// Replace with the invalidation is lazy. The actual removal happens when the entries is accessed.
+func (c *cache) Compact(revision int64) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if revision > c.compactedRev {
+ c.compactedRev = revision
+ }
+}
+
+func (c *cache) Size() int {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.lru.Len()
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/cluster.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/cluster.go
new file mode 100644
index 0000000..6e8d3c8
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/cluster.go
@@ -0,0 +1,177 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "sync"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/clientv3/naming"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "golang.org/x/time/rate"
+ gnaming "google.golang.org/grpc/naming"
+)
+
+// allow maximum 1 retry per second
+const resolveRetryRate = 1
+
+type clusterProxy struct {
+ clus clientv3.Cluster
+ ctx context.Context
+ gr *naming.GRPCResolver
+
+ // advertise client URL
+ advaddr string
+ prefix string
+
+ umu sync.RWMutex
+ umap map[string]gnaming.Update
+}
+
+// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
+// The returned channel is closed when there is grpc-proxy endpoint registered
+// and the client's context is canceled so the 'register' loop returns.
+func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
+ cp := &clusterProxy{
+ clus: c.Cluster,
+ ctx: c.Ctx(),
+ gr: &naming.GRPCResolver{Client: c},
+
+ advaddr: advaddr,
+ prefix: prefix,
+ umap: make(map[string]gnaming.Update),
+ }
+
+ donec := make(chan struct{})
+ if advaddr != "" && prefix != "" {
+ go func() {
+ defer close(donec)
+ cp.resolve(prefix)
+ }()
+ return cp, donec
+ }
+
+ close(donec)
+ return cp, donec
+}
+
+func (cp *clusterProxy) resolve(prefix string) {
+ rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate)
+ for rm.Wait(cp.ctx) == nil {
+ wa, err := cp.gr.Resolve(prefix)
+ if err != nil {
+ plog.Warningf("failed to resolve %q (%v)", prefix, err)
+ continue
+ }
+ cp.monitor(wa)
+ }
+}
+
+func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
+ for cp.ctx.Err() == nil {
+ ups, err := wa.Next()
+ if err != nil {
+ plog.Warningf("clusterProxy watcher error (%v)", err)
+ if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
+ return
+ }
+ }
+
+ cp.umu.Lock()
+ for i := range ups {
+ switch ups[i].Op {
+ case gnaming.Add:
+ cp.umap[ups[i].Addr] = *ups[i]
+ case gnaming.Delete:
+ delete(cp.umap, ups[i].Addr)
+ }
+ }
+ cp.umu.Unlock()
+ }
+}
+
+func (cp *clusterProxy) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) {
+ mresp, err := cp.clus.MemberAdd(ctx, r.PeerURLs)
+ if err != nil {
+ return nil, err
+ }
+ resp := (pb.MemberAddResponse)(*mresp)
+ return &resp, err
+}
+
+func (cp *clusterProxy) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
+ mresp, err := cp.clus.MemberRemove(ctx, r.ID)
+ if err != nil {
+ return nil, err
+ }
+ resp := (pb.MemberRemoveResponse)(*mresp)
+ return &resp, err
+}
+
+func (cp *clusterProxy) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
+ mresp, err := cp.clus.MemberUpdate(ctx, r.ID, r.PeerURLs)
+ if err != nil {
+ return nil, err
+ }
+ resp := (pb.MemberUpdateResponse)(*mresp)
+ return &resp, err
+}
+
+func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) {
+ cp.umu.RLock()
+ defer cp.umu.RUnlock()
+ mbs := make([]*pb.Member, 0, len(cp.umap))
+ for addr, upt := range cp.umap {
+ m, err := decodeMeta(fmt.Sprint(upt.Metadata))
+ if err != nil {
+ return nil, err
+ }
+ mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{addr}})
+ }
+ return mbs, nil
+}
+
+// MemberList wraps member list API with following rules:
+// - If 'advaddr' is not empty and 'prefix' is not empty, return registered member lists via resolver
+// - If 'advaddr' is not empty and 'prefix' is not empty and registered grpc-proxy members haven't been fetched, return the 'advaddr'
+// - If 'advaddr' is not empty and 'prefix' is empty, return 'advaddr' without forcing it to 'register'
+// - If 'advaddr' is empty, forward to member list API
+func (cp *clusterProxy) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) {
+ if cp.advaddr != "" {
+ if cp.prefix != "" {
+ mbs, err := cp.membersFromUpdates()
+ if err != nil {
+ return nil, err
+ }
+ if len(mbs) > 0 {
+ return &pb.MemberListResponse{Members: mbs}, nil
+ }
+ }
+ // prefix is empty or no grpc-proxy members haven't been registered
+ hostname, _ := os.Hostname()
+ return &pb.MemberListResponse{Members: []*pb.Member{{Name: hostname, ClientURLs: []string{cp.advaddr}}}}, nil
+ }
+ mresp, err := cp.clus.MemberList(ctx)
+ if err != nil {
+ return nil, err
+ }
+ resp := (pb.MemberListResponse)(*mresp)
+ return &resp, err
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/doc.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/doc.go
new file mode 100644
index 0000000..fc022e3
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package grpcproxy is an OSI level 7 proxy for etcd v3 API requests.
+package grpcproxy
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/election.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/election.go
new file mode 100644
index 0000000..4b4a4cc
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/election.go
@@ -0,0 +1,65 @@
+// Copyright 2017 The etcd Lockors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
+)
+
+type electionProxy struct {
+ client *clientv3.Client
+}
+
+func NewElectionProxy(client *clientv3.Client) v3electionpb.ElectionServer {
+ return &electionProxy{client: client}
+}
+
+func (ep *electionProxy) Campaign(ctx context.Context, req *v3electionpb.CampaignRequest) (*v3electionpb.CampaignResponse, error) {
+ return v3electionpb.NewElectionClient(ep.client.ActiveConnection()).Campaign(ctx, req)
+}
+
+func (ep *electionProxy) Proclaim(ctx context.Context, req *v3electionpb.ProclaimRequest) (*v3electionpb.ProclaimResponse, error) {
+ return v3electionpb.NewElectionClient(ep.client.ActiveConnection()).Proclaim(ctx, req)
+}
+
+func (ep *electionProxy) Leader(ctx context.Context, req *v3electionpb.LeaderRequest) (*v3electionpb.LeaderResponse, error) {
+ return v3electionpb.NewElectionClient(ep.client.ActiveConnection()).Leader(ctx, req)
+}
+
+func (ep *electionProxy) Observe(req *v3electionpb.LeaderRequest, s v3electionpb.Election_ObserveServer) error {
+ conn := ep.client.ActiveConnection()
+ ctx, cancel := context.WithCancel(s.Context())
+ defer cancel()
+ sc, err := v3electionpb.NewElectionClient(conn).Observe(ctx, req)
+ if err != nil {
+ return err
+ }
+ for {
+ rr, err := sc.Recv()
+ if err != nil {
+ return err
+ }
+ if err = s.Send(rr); err != nil {
+ return err
+ }
+ }
+}
+
+func (ep *electionProxy) Resign(ctx context.Context, req *v3electionpb.ResignRequest) (*v3electionpb.ResignResponse, error) {
+ return v3electionpb.NewElectionClient(ep.client.ActiveConnection()).Resign(ctx, req)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/health.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/health.go
new file mode 100644
index 0000000..e5e91f2
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/health.go
@@ -0,0 +1,41 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+ "net/http"
+ "time"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/etcdserver/api/etcdhttp"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+)
+
+// HandleHealth registers health handler on '/health'.
+func HandleHealth(mux *http.ServeMux, c *clientv3.Client) {
+ mux.Handle(etcdhttp.PathHealth, etcdhttp.NewHealthHandler(func() etcdhttp.Health { return checkHealth(c) }))
+}
+
+func checkHealth(c *clientv3.Client) etcdhttp.Health {
+ h := etcdhttp.Health{Health: "false"}
+ ctx, cancel := context.WithTimeout(c.Ctx(), time.Second)
+ _, err := c.Get(ctx, "a")
+ cancel()
+ if err == nil || err == rpctypes.ErrPermissionDenied {
+ h.Health = "true"
+ }
+ return h
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/kv.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/kv.go
new file mode 100644
index 0000000..1c9860f
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/kv.go
@@ -0,0 +1,232 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+
+ "github.com/coreos/etcd/clientv3"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "github.com/coreos/etcd/proxy/grpcproxy/cache"
+)
+
+type kvProxy struct {
+ kv clientv3.KV
+ cache cache.Cache
+}
+
+func NewKvProxy(c *clientv3.Client) (pb.KVServer, <-chan struct{}) {
+ kv := &kvProxy{
+ kv: c.KV,
+ cache: cache.NewCache(cache.DefaultMaxEntries),
+ }
+ donec := make(chan struct{})
+ close(donec)
+ return kv, donec
+}
+
+func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+ if r.Serializable {
+ resp, err := p.cache.Get(r)
+ switch err {
+ case nil:
+ cacheHits.Inc()
+ return resp, nil
+ case cache.ErrCompacted:
+ cacheHits.Inc()
+ return nil, err
+ }
+
+ cachedMisses.Inc()
+ }
+
+ resp, err := p.kv.Do(ctx, RangeRequestToOp(r))
+ if err != nil {
+ return nil, err
+ }
+
+ // cache linearizable as serializable
+ req := *r
+ req.Serializable = true
+ gresp := (*pb.RangeResponse)(resp.Get())
+ p.cache.Add(&req, gresp)
+ cacheKeys.Set(float64(p.cache.Size()))
+
+ return gresp, nil
+}
+
+func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
+ p.cache.Invalidate(r.Key, nil)
+ cacheKeys.Set(float64(p.cache.Size()))
+
+ resp, err := p.kv.Do(ctx, PutRequestToOp(r))
+ return (*pb.PutResponse)(resp.Put()), err
+}
+
+func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
+ p.cache.Invalidate(r.Key, r.RangeEnd)
+ cacheKeys.Set(float64(p.cache.Size()))
+
+ resp, err := p.kv.Do(ctx, DelRequestToOp(r))
+ return (*pb.DeleteRangeResponse)(resp.Del()), err
+}
+
+func (p *kvProxy) txnToCache(reqs []*pb.RequestOp, resps []*pb.ResponseOp) {
+ for i := range resps {
+ switch tv := resps[i].Response.(type) {
+ case *pb.ResponseOp_ResponsePut:
+ p.cache.Invalidate(reqs[i].GetRequestPut().Key, nil)
+ case *pb.ResponseOp_ResponseDeleteRange:
+ rdr := reqs[i].GetRequestDeleteRange()
+ p.cache.Invalidate(rdr.Key, rdr.RangeEnd)
+ case *pb.ResponseOp_ResponseRange:
+ req := *(reqs[i].GetRequestRange())
+ req.Serializable = true
+ p.cache.Add(&req, tv.ResponseRange)
+ }
+ }
+}
+
+func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
+ op := TxnRequestToOp(r)
+ opResp, err := p.kv.Do(ctx, op)
+ if err != nil {
+ return nil, err
+ }
+ resp := opResp.Txn()
+
+ // txn may claim an outdated key is updated; be safe and invalidate
+ for _, cmp := range r.Compare {
+ p.cache.Invalidate(cmp.Key, cmp.RangeEnd)
+ }
+ // update any fetched keys
+ if resp.Succeeded {
+ p.txnToCache(r.Success, resp.Responses)
+ } else {
+ p.txnToCache(r.Failure, resp.Responses)
+ }
+
+ cacheKeys.Set(float64(p.cache.Size()))
+
+ return (*pb.TxnResponse)(resp), nil
+}
+
+func (p *kvProxy) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
+ var opts []clientv3.CompactOption
+ if r.Physical {
+ opts = append(opts, clientv3.WithCompactPhysical())
+ }
+
+ resp, err := p.kv.Compact(ctx, r.Revision, opts...)
+ if err == nil {
+ p.cache.Compact(r.Revision)
+ }
+
+ cacheKeys.Set(float64(p.cache.Size()))
+
+ return (*pb.CompactionResponse)(resp), err
+}
+
+func requestOpToOp(union *pb.RequestOp) clientv3.Op {
+ switch tv := union.Request.(type) {
+ case *pb.RequestOp_RequestRange:
+ if tv.RequestRange != nil {
+ return RangeRequestToOp(tv.RequestRange)
+ }
+ case *pb.RequestOp_RequestPut:
+ if tv.RequestPut != nil {
+ return PutRequestToOp(tv.RequestPut)
+ }
+ case *pb.RequestOp_RequestDeleteRange:
+ if tv.RequestDeleteRange != nil {
+ return DelRequestToOp(tv.RequestDeleteRange)
+ }
+ case *pb.RequestOp_RequestTxn:
+ if tv.RequestTxn != nil {
+ return TxnRequestToOp(tv.RequestTxn)
+ }
+ }
+ panic("unknown request")
+}
+
+func RangeRequestToOp(r *pb.RangeRequest) clientv3.Op {
+ opts := []clientv3.OpOption{}
+ if len(r.RangeEnd) != 0 {
+ opts = append(opts, clientv3.WithRange(string(r.RangeEnd)))
+ }
+ opts = append(opts, clientv3.WithRev(r.Revision))
+ opts = append(opts, clientv3.WithLimit(r.Limit))
+ opts = append(opts, clientv3.WithSort(
+ clientv3.SortTarget(r.SortTarget),
+ clientv3.SortOrder(r.SortOrder)),
+ )
+ opts = append(opts, clientv3.WithMaxCreateRev(r.MaxCreateRevision))
+ opts = append(opts, clientv3.WithMinCreateRev(r.MinCreateRevision))
+ opts = append(opts, clientv3.WithMaxModRev(r.MaxModRevision))
+ opts = append(opts, clientv3.WithMinModRev(r.MinModRevision))
+ if r.CountOnly {
+ opts = append(opts, clientv3.WithCountOnly())
+ }
+ if r.KeysOnly {
+ opts = append(opts, clientv3.WithKeysOnly())
+ }
+ if r.Serializable {
+ opts = append(opts, clientv3.WithSerializable())
+ }
+
+ return clientv3.OpGet(string(r.Key), opts...)
+}
+
+func PutRequestToOp(r *pb.PutRequest) clientv3.Op {
+ opts := []clientv3.OpOption{}
+ opts = append(opts, clientv3.WithLease(clientv3.LeaseID(r.Lease)))
+ if r.IgnoreValue {
+ opts = append(opts, clientv3.WithIgnoreValue())
+ }
+ if r.IgnoreLease {
+ opts = append(opts, clientv3.WithIgnoreLease())
+ }
+ if r.PrevKv {
+ opts = append(opts, clientv3.WithPrevKV())
+ }
+ return clientv3.OpPut(string(r.Key), string(r.Value), opts...)
+}
+
+func DelRequestToOp(r *pb.DeleteRangeRequest) clientv3.Op {
+ opts := []clientv3.OpOption{}
+ if len(r.RangeEnd) != 0 {
+ opts = append(opts, clientv3.WithRange(string(r.RangeEnd)))
+ }
+ if r.PrevKv {
+ opts = append(opts, clientv3.WithPrevKV())
+ }
+ return clientv3.OpDelete(string(r.Key), opts...)
+}
+
+func TxnRequestToOp(r *pb.TxnRequest) clientv3.Op {
+ cmps := make([]clientv3.Cmp, len(r.Compare))
+ thenops := make([]clientv3.Op, len(r.Success))
+ elseops := make([]clientv3.Op, len(r.Failure))
+ for i := range r.Compare {
+ cmps[i] = (clientv3.Cmp)(*r.Compare[i])
+ }
+ for i := range r.Success {
+ thenops[i] = requestOpToOp(r.Success[i])
+ }
+ for i := range r.Failure {
+ elseops[i] = requestOpToOp(r.Failure[i])
+ }
+ return clientv3.OpTxn(cmps, thenops, elseops)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/leader.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/leader.go
new file mode 100644
index 0000000..042c949
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/leader.go
@@ -0,0 +1,115 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+ "math"
+ "sync"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+
+ "golang.org/x/time/rate"
+ "google.golang.org/grpc"
+)
+
+const (
+ lostLeaderKey = "__lostleader" // watched to detect leader loss
+ retryPerSecond = 10
+)
+
+type leader struct {
+ ctx context.Context
+ w clientv3.Watcher
+ // mu protects leaderc updates.
+ mu sync.RWMutex
+ leaderc chan struct{}
+ disconnc chan struct{}
+ donec chan struct{}
+}
+
+func newLeader(ctx context.Context, w clientv3.Watcher) *leader {
+ l := &leader{
+ ctx: clientv3.WithRequireLeader(ctx),
+ w: w,
+ leaderc: make(chan struct{}),
+ disconnc: make(chan struct{}),
+ donec: make(chan struct{}),
+ }
+ // begin assuming leader is lost
+ close(l.leaderc)
+ go l.recvLoop()
+ return l
+}
+
+func (l *leader) recvLoop() {
+ defer close(l.donec)
+
+ limiter := rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond)
+ rev := int64(math.MaxInt64 - 2)
+ for limiter.Wait(l.ctx) == nil {
+ wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify())
+ cresp, ok := <-wch
+ if !ok {
+ l.loseLeader()
+ continue
+ }
+ if cresp.Err() != nil {
+ l.loseLeader()
+ if rpctypes.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() {
+ close(l.disconnc)
+ return
+ }
+ continue
+ }
+ l.gotLeader()
+ <-wch
+ l.loseLeader()
+ }
+}
+
+func (l *leader) loseLeader() {
+ l.mu.RLock()
+ defer l.mu.RUnlock()
+ select {
+ case <-l.leaderc:
+ default:
+ close(l.leaderc)
+ }
+}
+
+// gotLeader will force update the leadership status to having a leader.
+func (l *leader) gotLeader() {
+ l.mu.Lock()
+ defer l.mu.Unlock()
+ select {
+ case <-l.leaderc:
+ l.leaderc = make(chan struct{})
+ default:
+ }
+}
+
+func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc }
+
+func (l *leader) stopNotify() <-chan struct{} { return l.donec }
+
+// lostNotify returns a channel that is closed if there has been
+// a leader loss not yet followed by a leader reacquire.
+func (l *leader) lostNotify() <-chan struct{} {
+ l.mu.RLock()
+ defer l.mu.RUnlock()
+ return l.leaderc
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/lease.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/lease.go
new file mode 100644
index 0000000..65f68b0
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/lease.go
@@ -0,0 +1,382 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+ "io"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+type leaseProxy struct {
+ // leaseClient handles req from LeaseGrant() that requires a lease ID.
+ leaseClient pb.LeaseClient
+
+ lessor clientv3.Lease
+
+ ctx context.Context
+
+ leader *leader
+
+ // mu protects adding outstanding leaseProxyStream through wg.
+ mu sync.RWMutex
+
+ // wg waits until all outstanding leaseProxyStream quit.
+ wg sync.WaitGroup
+}
+
+func NewLeaseProxy(c *clientv3.Client) (pb.LeaseServer, <-chan struct{}) {
+ cctx, cancel := context.WithCancel(c.Ctx())
+ lp := &leaseProxy{
+ leaseClient: pb.NewLeaseClient(c.ActiveConnection()),
+ lessor: c.Lease,
+ ctx: cctx,
+ leader: newLeader(c.Ctx(), c.Watcher),
+ }
+ ch := make(chan struct{})
+ go func() {
+ defer close(ch)
+ <-lp.leader.stopNotify()
+ lp.mu.Lock()
+ select {
+ case <-lp.ctx.Done():
+ case <-lp.leader.disconnectNotify():
+ cancel()
+ }
+ <-lp.ctx.Done()
+ lp.mu.Unlock()
+ lp.wg.Wait()
+ }()
+ return lp, ch
+}
+
+func (lp *leaseProxy) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
+ rp, err := lp.leaseClient.LeaseGrant(ctx, cr, grpc.FailFast(false))
+ if err != nil {
+ return nil, err
+ }
+ lp.leader.gotLeader()
+ return rp, nil
+}
+
+func (lp *leaseProxy) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
+ r, err := lp.lessor.Revoke(ctx, clientv3.LeaseID(rr.ID))
+ if err != nil {
+ return nil, err
+ }
+ lp.leader.gotLeader()
+ return (*pb.LeaseRevokeResponse)(r), nil
+}
+
+func (lp *leaseProxy) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
+ var (
+ r *clientv3.LeaseTimeToLiveResponse
+ err error
+ )
+ if rr.Keys {
+ r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID), clientv3.WithAttachedKeys())
+ } else {
+ r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID))
+ }
+ if err != nil {
+ return nil, err
+ }
+ rp := &pb.LeaseTimeToLiveResponse{
+ Header: r.ResponseHeader,
+ ID: int64(r.ID),
+ TTL: r.TTL,
+ GrantedTTL: r.GrantedTTL,
+ Keys: r.Keys,
+ }
+ return rp, err
+}
+
+func (lp *leaseProxy) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
+ r, err := lp.lessor.Leases(ctx)
+ if err != nil {
+ return nil, err
+ }
+ leases := make([]*pb.LeaseStatus, len(r.Leases))
+ for i := range r.Leases {
+ leases[i] = &pb.LeaseStatus{ID: int64(r.Leases[i].ID)}
+ }
+ rp := &pb.LeaseLeasesResponse{
+ Header: r.ResponseHeader,
+ Leases: leases,
+ }
+ return rp, err
+}
+
+func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
+ lp.mu.Lock()
+ select {
+ case <-lp.ctx.Done():
+ lp.mu.Unlock()
+ return lp.ctx.Err()
+ default:
+ lp.wg.Add(1)
+ }
+ lp.mu.Unlock()
+
+ ctx, cancel := context.WithCancel(stream.Context())
+ lps := leaseProxyStream{
+ stream: stream,
+ lessor: lp.lessor,
+ keepAliveLeases: make(map[int64]*atomicCounter),
+ respc: make(chan *pb.LeaseKeepAliveResponse),
+ ctx: ctx,
+ cancel: cancel,
+ }
+
+ errc := make(chan error, 2)
+
+ var lostLeaderC <-chan struct{}
+ if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
+ v := md[rpctypes.MetadataRequireLeaderKey]
+ if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
+ lostLeaderC = lp.leader.lostNotify()
+ // if leader is known to be lost at creation time, avoid
+ // letting events through at all
+ select {
+ case <-lostLeaderC:
+ lp.wg.Done()
+ return rpctypes.ErrNoLeader
+ default:
+ }
+ }
+ }
+ stopc := make(chan struct{}, 3)
+ go func() {
+ defer func() { stopc <- struct{}{} }()
+ if err := lps.recvLoop(); err != nil {
+ errc <- err
+ }
+ }()
+
+ go func() {
+ defer func() { stopc <- struct{}{} }()
+ if err := lps.sendLoop(); err != nil {
+ errc <- err
+ }
+ }()
+
+ // tears down LeaseKeepAlive stream if leader goes down or entire leaseProxy is terminated.
+ go func() {
+ defer func() { stopc <- struct{}{} }()
+ select {
+ case <-lostLeaderC:
+ case <-ctx.Done():
+ case <-lp.ctx.Done():
+ }
+ }()
+
+ var err error
+ select {
+ case <-stopc:
+ stopc <- struct{}{}
+ case err = <-errc:
+ }
+ cancel()
+
+ // recv/send may only shutdown after function exits;
+ // this goroutine notifies lease proxy that the stream is through
+ go func() {
+ <-stopc
+ <-stopc
+ <-stopc
+ lps.close()
+ close(errc)
+ lp.wg.Done()
+ }()
+
+ select {
+ case <-lostLeaderC:
+ return rpctypes.ErrNoLeader
+ case <-lp.leader.disconnectNotify():
+ return grpc.ErrClientConnClosing
+ default:
+ if err != nil {
+ return err
+ }
+ return ctx.Err()
+ }
+}
+
+type leaseProxyStream struct {
+ stream pb.Lease_LeaseKeepAliveServer
+
+ lessor clientv3.Lease
+ // wg tracks keepAliveLoop goroutines
+ wg sync.WaitGroup
+ // mu protects keepAliveLeases
+ mu sync.RWMutex
+ // keepAliveLeases tracks how many outstanding keepalive requests which need responses are on a lease.
+ keepAliveLeases map[int64]*atomicCounter
+ // respc receives lease keepalive responses from etcd backend
+ respc chan *pb.LeaseKeepAliveResponse
+
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+func (lps *leaseProxyStream) recvLoop() error {
+ for {
+ rr, err := lps.stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ lps.mu.Lock()
+ neededResps, ok := lps.keepAliveLeases[rr.ID]
+ if !ok {
+ neededResps = &atomicCounter{}
+ lps.keepAliveLeases[rr.ID] = neededResps
+ lps.wg.Add(1)
+ go func() {
+ defer lps.wg.Done()
+ if err := lps.keepAliveLoop(rr.ID, neededResps); err != nil {
+ lps.cancel()
+ }
+ }()
+ }
+ neededResps.add(1)
+ lps.mu.Unlock()
+ }
+}
+
+func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCounter) error {
+ cctx, ccancel := context.WithCancel(lps.ctx)
+ defer ccancel()
+ respc, err := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID))
+ if err != nil {
+ return err
+ }
+ // ticker expires when loop hasn't received keepalive within TTL
+ var ticker <-chan time.Time
+ for {
+ select {
+ case <-ticker:
+ lps.mu.Lock()
+ // if there are outstanding keepAlive reqs at the moment of ticker firing,
+ // don't close keepAliveLoop(), let it continuing to process the KeepAlive reqs.
+ if neededResps.get() > 0 {
+ lps.mu.Unlock()
+ ticker = nil
+ continue
+ }
+ delete(lps.keepAliveLeases, leaseID)
+ lps.mu.Unlock()
+ return nil
+ case rp, ok := <-respc:
+ if !ok {
+ lps.mu.Lock()
+ delete(lps.keepAliveLeases, leaseID)
+ lps.mu.Unlock()
+ if neededResps.get() == 0 {
+ return nil
+ }
+ ttlResp, err := lps.lessor.TimeToLive(cctx, clientv3.LeaseID(leaseID))
+ if err != nil {
+ return err
+ }
+ r := &pb.LeaseKeepAliveResponse{
+ Header: ttlResp.ResponseHeader,
+ ID: int64(ttlResp.ID),
+ TTL: ttlResp.TTL,
+ }
+ for neededResps.get() > 0 {
+ select {
+ case lps.respc <- r:
+ neededResps.add(-1)
+ case <-lps.ctx.Done():
+ return nil
+ }
+ }
+ return nil
+ }
+ if neededResps.get() == 0 {
+ continue
+ }
+ ticker = time.After(time.Duration(rp.TTL) * time.Second)
+ r := &pb.LeaseKeepAliveResponse{
+ Header: rp.ResponseHeader,
+ ID: int64(rp.ID),
+ TTL: rp.TTL,
+ }
+ lps.replyToClient(r, neededResps)
+ }
+ }
+}
+
+func (lps *leaseProxyStream) replyToClient(r *pb.LeaseKeepAliveResponse, neededResps *atomicCounter) {
+ timer := time.After(500 * time.Millisecond)
+ for neededResps.get() > 0 {
+ select {
+ case lps.respc <- r:
+ neededResps.add(-1)
+ case <-timer:
+ return
+ case <-lps.ctx.Done():
+ return
+ }
+ }
+}
+
+func (lps *leaseProxyStream) sendLoop() error {
+ for {
+ select {
+ case lrp, ok := <-lps.respc:
+ if !ok {
+ return nil
+ }
+ if err := lps.stream.Send(lrp); err != nil {
+ return err
+ }
+ case <-lps.ctx.Done():
+ return lps.ctx.Err()
+ }
+ }
+}
+
+func (lps *leaseProxyStream) close() {
+ lps.cancel()
+ lps.wg.Wait()
+ // only close respc channel if all the keepAliveLoop() goroutines have finished
+ // this ensures those goroutines don't send resp to a closed resp channel
+ close(lps.respc)
+}
+
+type atomicCounter struct {
+ counter int64
+}
+
+func (ac *atomicCounter) add(delta int64) {
+ atomic.AddInt64(&ac.counter, delta)
+}
+
+func (ac *atomicCounter) get() int64 {
+ return atomic.LoadInt64(&ac.counter)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/lock.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/lock.go
new file mode 100644
index 0000000..ceef26f
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/lock.go
@@ -0,0 +1,38 @@
+// Copyright 2017 The etcd Lockors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
+)
+
+type lockProxy struct {
+ client *clientv3.Client
+}
+
+func NewLockProxy(client *clientv3.Client) v3lockpb.LockServer {
+ return &lockProxy{client: client}
+}
+
+func (lp *lockProxy) Lock(ctx context.Context, req *v3lockpb.LockRequest) (*v3lockpb.LockResponse, error) {
+ return v3lockpb.NewLockClient(lp.client.ActiveConnection()).Lock(ctx, req)
+}
+
+func (lp *lockProxy) Unlock(ctx context.Context, req *v3lockpb.UnlockRequest) (*v3lockpb.UnlockResponse, error) {
+ return v3lockpb.NewLockClient(lp.client.ActiveConnection()).Unlock(ctx, req)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/logger.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/logger.go
new file mode 100644
index 0000000..c2d8180
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/logger.go
@@ -0,0 +1,19 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import "github.com/coreos/pkg/capnslog"
+
+var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "grpcproxy")
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/maintenance.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/maintenance.go
new file mode 100644
index 0000000..291e8e3
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/maintenance.go
@@ -0,0 +1,90 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+ "io"
+
+ "github.com/coreos/etcd/clientv3"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type maintenanceProxy struct {
+ client *clientv3.Client
+}
+
+func NewMaintenanceProxy(c *clientv3.Client) pb.MaintenanceServer {
+ return &maintenanceProxy{
+ client: c,
+ }
+}
+
+func (mp *maintenanceProxy) Defragment(ctx context.Context, dr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
+ conn := mp.client.ActiveConnection()
+ return pb.NewMaintenanceClient(conn).Defragment(ctx, dr)
+}
+
+func (mp *maintenanceProxy) Snapshot(sr *pb.SnapshotRequest, stream pb.Maintenance_SnapshotServer) error {
+ conn := mp.client.ActiveConnection()
+ ctx, cancel := context.WithCancel(stream.Context())
+ defer cancel()
+
+ ctx = withClientAuthToken(ctx, stream.Context())
+
+ sc, err := pb.NewMaintenanceClient(conn).Snapshot(ctx, sr)
+ if err != nil {
+ return err
+ }
+
+ for {
+ rr, err := sc.Recv()
+ if err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ err = stream.Send(rr)
+ if err != nil {
+ return err
+ }
+ }
+}
+
+func (mp *maintenanceProxy) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
+ conn := mp.client.ActiveConnection()
+ return pb.NewMaintenanceClient(conn).Hash(ctx, r)
+}
+
+func (mp *maintenanceProxy) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
+ conn := mp.client.ActiveConnection()
+ return pb.NewMaintenanceClient(conn).HashKV(ctx, r)
+}
+
+func (mp *maintenanceProxy) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
+ conn := mp.client.ActiveConnection()
+ return pb.NewMaintenanceClient(conn).Alarm(ctx, r)
+}
+
+func (mp *maintenanceProxy) Status(ctx context.Context, r *pb.StatusRequest) (*pb.StatusResponse, error) {
+ conn := mp.client.ActiveConnection()
+ return pb.NewMaintenanceClient(conn).Status(ctx, r)
+}
+
+func (mp *maintenanceProxy) MoveLeader(ctx context.Context, r *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
+ conn := mp.client.ActiveConnection()
+ return pb.NewMaintenanceClient(conn).MoveLeader(ctx, r)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/metrics.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/metrics.go
new file mode 100644
index 0000000..864fa16
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/metrics.go
@@ -0,0 +1,58 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import "github.com/prometheus/client_golang/prometheus"
+
+var (
+ watchersCoalescing = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "grpc_proxy",
+ Name: "watchers_coalescing_total",
+ Help: "Total number of current watchers coalescing",
+ })
+ eventsCoalescing = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "grpc_proxy",
+ Name: "events_coalescing_total",
+ Help: "Total number of events coalescing",
+ })
+ cacheKeys = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "grpc_proxy",
+ Name: "cache_keys_total",
+ Help: "Total number of keys/ranges cached",
+ })
+ cacheHits = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "grpc_proxy",
+ Name: "cache_hits_total",
+ Help: "Total number of cache hits",
+ })
+ cachedMisses = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "grpc_proxy",
+ Name: "cache_misses_total",
+ Help: "Total number of cache misses",
+ })
+)
+
+func init() {
+ prometheus.MustRegister(watchersCoalescing)
+ prometheus.MustRegister(eventsCoalescing)
+ prometheus.MustRegister(cacheKeys)
+ prometheus.MustRegister(cacheHits)
+ prometheus.MustRegister(cachedMisses)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/register.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/register.go
new file mode 100644
index 0000000..598c71f
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/register.go
@@ -0,0 +1,94 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "encoding/json"
+ "os"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/clientv3/concurrency"
+ "github.com/coreos/etcd/clientv3/naming"
+
+ "golang.org/x/time/rate"
+ gnaming "google.golang.org/grpc/naming"
+)
+
+// allow maximum 1 retry per second
+const registerRetryRate = 1
+
+// Register registers itself as a grpc-proxy server by writing prefixed-key
+// with session of specified TTL (in seconds). The returned channel is closed
+// when the client's context is canceled.
+func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
+ rm := rate.NewLimiter(rate.Limit(registerRetryRate), registerRetryRate)
+
+ donec := make(chan struct{})
+ go func() {
+ defer close(donec)
+
+ for rm.Wait(c.Ctx()) == nil {
+ ss, err := registerSession(c, prefix, addr, ttl)
+ if err != nil {
+ plog.Warningf("failed to create a session %v", err)
+ continue
+ }
+ select {
+ case <-c.Ctx().Done():
+ ss.Close()
+ return
+
+ case <-ss.Done():
+ plog.Warning("session expired; possible network partition or server restart")
+ plog.Warning("creating a new session to rejoin")
+ continue
+ }
+ }
+ }()
+
+ return donec
+}
+
+func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*concurrency.Session, error) {
+ ss, err := concurrency.NewSession(c, concurrency.WithTTL(ttl))
+ if err != nil {
+ return nil, err
+ }
+
+ gr := &naming.GRPCResolver{Client: c}
+ if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil {
+ return nil, err
+ }
+
+ plog.Infof("registered %q with %d-second lease", addr, ttl)
+ return ss, nil
+}
+
+// meta represents metadata of proxy register.
+type meta struct {
+ Name string `json:"name"`
+}
+
+func getMeta() string {
+ hostname, _ := os.Hostname()
+ bts, _ := json.Marshal(meta{Name: hostname})
+ return string(bts)
+}
+
+func decodeMeta(s string) (meta, error) {
+ m := meta{}
+ err := json.Unmarshal([]byte(s), &m)
+ return m, err
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/util.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/util.go
new file mode 100644
index 0000000..2b226fa
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/util.go
@@ -0,0 +1,73 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+func getAuthTokenFromClient(ctx context.Context) string {
+ md, ok := metadata.FromIncomingContext(ctx)
+ if ok {
+ ts, ok := md["token"]
+ if ok {
+ return ts[0]
+ }
+ }
+ return ""
+}
+
+func withClientAuthToken(ctx context.Context, ctxWithToken context.Context) context.Context {
+ token := getAuthTokenFromClient(ctxWithToken)
+ if token != "" {
+ ctx = context.WithValue(ctx, "token", token)
+ }
+ return ctx
+}
+
+type proxyTokenCredential struct {
+ token string
+}
+
+func (cred *proxyTokenCredential) RequireTransportSecurity() bool {
+ return false
+}
+
+func (cred *proxyTokenCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
+ return map[string]string{
+ "token": cred.token,
+ }, nil
+}
+
+func AuthUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ token := getAuthTokenFromClient(ctx)
+ if token != "" {
+ tokenCred := &proxyTokenCredential{token}
+ opts = append(opts, grpc.PerRPCCredentials(tokenCred))
+ }
+ return invoker(ctx, method, req, reply, cc, opts...)
+}
+
+func AuthStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ tokenif := ctx.Value("token")
+ if tokenif != nil {
+ tokenCred := &proxyTokenCredential{tokenif.(string)}
+ opts = append(opts, grpc.PerRPCCredentials(tokenCred))
+ }
+ return streamer(ctx, desc, cc, method, opts...)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch.go
new file mode 100644
index 0000000..603095f
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch.go
@@ -0,0 +1,298 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+ "sync"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+type watchProxy struct {
+ cw clientv3.Watcher
+ ctx context.Context
+
+ leader *leader
+
+ ranges *watchRanges
+
+ // mu protects adding outstanding watch servers through wg.
+ mu sync.Mutex
+
+ // wg waits until all outstanding watch servers quit.
+ wg sync.WaitGroup
+
+ // kv is used for permission checking
+ kv clientv3.KV
+}
+
+func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
+ cctx, cancel := context.WithCancel(c.Ctx())
+ wp := &watchProxy{
+ cw: c.Watcher,
+ ctx: cctx,
+ leader: newLeader(c.Ctx(), c.Watcher),
+
+ kv: c.KV, // for permission checking
+ }
+ wp.ranges = newWatchRanges(wp)
+ ch := make(chan struct{})
+ go func() {
+ defer close(ch)
+ <-wp.leader.stopNotify()
+ wp.mu.Lock()
+ select {
+ case <-wp.ctx.Done():
+ case <-wp.leader.disconnectNotify():
+ cancel()
+ }
+ <-wp.ctx.Done()
+ wp.mu.Unlock()
+ wp.wg.Wait()
+ wp.ranges.stop()
+ }()
+ return wp, ch
+}
+
+func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
+ wp.mu.Lock()
+ select {
+ case <-wp.ctx.Done():
+ wp.mu.Unlock()
+ select {
+ case <-wp.leader.disconnectNotify():
+ return grpc.ErrClientConnClosing
+ default:
+ return wp.ctx.Err()
+ }
+ default:
+ wp.wg.Add(1)
+ }
+ wp.mu.Unlock()
+
+ ctx, cancel := context.WithCancel(stream.Context())
+ wps := &watchProxyStream{
+ ranges: wp.ranges,
+ watchers: make(map[int64]*watcher),
+ stream: stream,
+ watchCh: make(chan *pb.WatchResponse, 1024),
+ ctx: ctx,
+ cancel: cancel,
+ kv: wp.kv,
+ }
+
+ var lostLeaderC <-chan struct{}
+ if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
+ v := md[rpctypes.MetadataRequireLeaderKey]
+ if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
+ lostLeaderC = wp.leader.lostNotify()
+ // if leader is known to be lost at creation time, avoid
+ // letting events through at all
+ select {
+ case <-lostLeaderC:
+ wp.wg.Done()
+ return rpctypes.ErrNoLeader
+ default:
+ }
+ }
+ }
+
+ // post to stopc => terminate server stream; can't use a waitgroup
+ // since all goroutines will only terminate after Watch() exits.
+ stopc := make(chan struct{}, 3)
+ go func() {
+ defer func() { stopc <- struct{}{} }()
+ wps.recvLoop()
+ }()
+ go func() {
+ defer func() { stopc <- struct{}{} }()
+ wps.sendLoop()
+ }()
+ // tear down watch if leader goes down or entire watch proxy is terminated
+ go func() {
+ defer func() { stopc <- struct{}{} }()
+ select {
+ case <-lostLeaderC:
+ case <-ctx.Done():
+ case <-wp.ctx.Done():
+ }
+ }()
+
+ <-stopc
+ cancel()
+
+ // recv/send may only shutdown after function exits;
+ // goroutine notifies proxy that stream is through
+ go func() {
+ <-stopc
+ <-stopc
+ wps.close()
+ wp.wg.Done()
+ }()
+
+ select {
+ case <-lostLeaderC:
+ return rpctypes.ErrNoLeader
+ case <-wp.leader.disconnectNotify():
+ return grpc.ErrClientConnClosing
+ default:
+ return wps.ctx.Err()
+ }
+}
+
+// watchProxyStream forwards etcd watch events to a proxied client stream.
+type watchProxyStream struct {
+ ranges *watchRanges
+
+ // mu protects watchers and nextWatcherID
+ mu sync.Mutex
+ // watchers receive events from watch broadcast.
+ watchers map[int64]*watcher
+ // nextWatcherID is the id to assign the next watcher on this stream.
+ nextWatcherID int64
+
+ stream pb.Watch_WatchServer
+
+ // watchCh receives watch responses from the watchers.
+ watchCh chan *pb.WatchResponse
+
+ ctx context.Context
+ cancel context.CancelFunc
+
+ // kv is used for permission checking
+ kv clientv3.KV
+}
+
+func (wps *watchProxyStream) close() {
+ var wg sync.WaitGroup
+ wps.cancel()
+ wps.mu.Lock()
+ wg.Add(len(wps.watchers))
+ for _, wpsw := range wps.watchers {
+ go func(w *watcher) {
+ wps.ranges.delete(w)
+ wg.Done()
+ }(wpsw)
+ }
+ wps.watchers = nil
+ wps.mu.Unlock()
+
+ wg.Wait()
+
+ close(wps.watchCh)
+}
+
+func (wps *watchProxyStream) checkPermissionForWatch(key, rangeEnd []byte) error {
+ if len(key) == 0 {
+ // If the length of the key is 0, we need to obtain full range.
+ // look at clientv3.WithPrefix()
+ key = []byte{0}
+ rangeEnd = []byte{0}
+ }
+ req := &pb.RangeRequest{
+ Serializable: true,
+ Key: key,
+ RangeEnd: rangeEnd,
+ CountOnly: true,
+ Limit: 1,
+ }
+ _, err := wps.kv.Do(wps.ctx, RangeRequestToOp(req))
+ return err
+}
+
+func (wps *watchProxyStream) recvLoop() error {
+ for {
+ req, err := wps.stream.Recv()
+ if err != nil {
+ return err
+ }
+ switch uv := req.RequestUnion.(type) {
+ case *pb.WatchRequest_CreateRequest:
+ cr := uv.CreateRequest
+
+ if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied {
+ // Return WatchResponse which is caused by permission checking if and only if
+ // the error is permission denied. For other errors (e.g. timeout or connection closed),
+ // the permission checking mechanism should do nothing for preserving error code.
+ wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true}
+ continue
+ }
+
+ w := &watcher{
+ wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
+ id: wps.nextWatcherID,
+ wps: wps,
+
+ nextrev: cr.StartRevision,
+ progress: cr.ProgressNotify,
+ prevKV: cr.PrevKv,
+ filters: v3rpc.FiltersFromRequest(cr),
+ }
+ if !w.wr.valid() {
+ w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
+ continue
+ }
+ wps.nextWatcherID++
+ w.nextrev = cr.StartRevision
+ wps.watchers[w.id] = w
+ wps.ranges.add(w)
+ case *pb.WatchRequest_CancelRequest:
+ wps.delete(uv.CancelRequest.WatchId)
+ default:
+ panic("not implemented")
+ }
+ }
+}
+
+func (wps *watchProxyStream) sendLoop() {
+ for {
+ select {
+ case wresp, ok := <-wps.watchCh:
+ if !ok {
+ return
+ }
+ if err := wps.stream.Send(wresp); err != nil {
+ return
+ }
+ case <-wps.ctx.Done():
+ return
+ }
+ }
+}
+
+func (wps *watchProxyStream) delete(id int64) {
+ wps.mu.Lock()
+ defer wps.mu.Unlock()
+
+ w, ok := wps.watchers[id]
+ if !ok {
+ return
+ }
+ wps.ranges.delete(w)
+ delete(wps.watchers, id)
+ resp := &pb.WatchResponse{
+ Header: &w.lastHeader,
+ WatchId: id,
+ Canceled: true,
+ }
+ wps.watchCh <- resp
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcast.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcast.go
new file mode 100644
index 0000000..46e56c7
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcast.go
@@ -0,0 +1,152 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+ "sync"
+
+ "github.com/coreos/etcd/clientv3"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+// watchBroadcast broadcasts a server watcher to many client watchers.
+type watchBroadcast struct {
+ // cancel stops the underlying etcd server watcher and closes ch.
+ cancel context.CancelFunc
+ donec chan struct{}
+
+ // mu protects rev and receivers.
+ mu sync.RWMutex
+ // nextrev is the minimum expected next revision of the watcher on ch.
+ nextrev int64
+ // receivers contains all the client-side watchers to serve.
+ receivers map[*watcher]struct{}
+ // responses counts the number of responses
+ responses int
+}
+
+func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast {
+ cctx, cancel := context.WithCancel(wp.ctx)
+ wb := &watchBroadcast{
+ cancel: cancel,
+ nextrev: w.nextrev,
+ receivers: make(map[*watcher]struct{}),
+ donec: make(chan struct{}),
+ }
+ wb.add(w)
+ go func() {
+ defer close(wb.donec)
+
+ opts := []clientv3.OpOption{
+ clientv3.WithRange(w.wr.end),
+ clientv3.WithProgressNotify(),
+ clientv3.WithRev(wb.nextrev),
+ clientv3.WithPrevKV(),
+ clientv3.WithCreatedNotify(),
+ }
+
+ cctx = withClientAuthToken(cctx, w.wps.stream.Context())
+
+ wch := wp.cw.Watch(cctx, w.wr.key, opts...)
+
+ for wr := range wch {
+ wb.bcast(wr)
+ update(wb)
+ }
+ }()
+ return wb
+}
+
+func (wb *watchBroadcast) bcast(wr clientv3.WatchResponse) {
+ wb.mu.Lock()
+ defer wb.mu.Unlock()
+ // watchers start on the given revision, if any; ignore header rev on create
+ if wb.responses > 0 || wb.nextrev == 0 {
+ wb.nextrev = wr.Header.Revision + 1
+ }
+ wb.responses++
+ for r := range wb.receivers {
+ r.send(wr)
+ }
+ if len(wb.receivers) > 0 {
+ eventsCoalescing.Add(float64(len(wb.receivers) - 1))
+ }
+}
+
+// add puts a watcher into receiving a broadcast if its revision at least
+// meets the broadcast revision. Returns true if added.
+func (wb *watchBroadcast) add(w *watcher) bool {
+ wb.mu.Lock()
+ defer wb.mu.Unlock()
+ if wb.nextrev > w.nextrev || (wb.nextrev == 0 && w.nextrev != 0) {
+ // wb is too far ahead, w will miss events
+ // or wb is being established with a current watcher
+ return false
+ }
+ if wb.responses == 0 {
+ // Newly created; create event will be sent by etcd.
+ wb.receivers[w] = struct{}{}
+ return true
+ }
+ // already sent by etcd; emulate create event
+ ok := w.post(&pb.WatchResponse{
+ Header: &pb.ResponseHeader{
+ // todo: fill in ClusterId
+ // todo: fill in MemberId:
+ Revision: w.nextrev,
+ // todo: fill in RaftTerm:
+ },
+ WatchId: w.id,
+ Created: true,
+ })
+ if !ok {
+ return false
+ }
+ wb.receivers[w] = struct{}{}
+ watchersCoalescing.Inc()
+
+ return true
+}
+func (wb *watchBroadcast) delete(w *watcher) {
+ wb.mu.Lock()
+ defer wb.mu.Unlock()
+ if _, ok := wb.receivers[w]; !ok {
+ panic("deleting missing watcher from broadcast")
+ }
+ delete(wb.receivers, w)
+ if len(wb.receivers) > 0 {
+ // do not dec the only left watcher for coalescing.
+ watchersCoalescing.Dec()
+ }
+}
+
+func (wb *watchBroadcast) size() int {
+ wb.mu.RLock()
+ defer wb.mu.RUnlock()
+ return len(wb.receivers)
+}
+
+func (wb *watchBroadcast) empty() bool { return wb.size() == 0 }
+
+func (wb *watchBroadcast) stop() {
+ if !wb.empty() {
+ // do not dec the only left watcher for coalescing.
+ watchersCoalescing.Sub(float64(wb.size() - 1))
+ }
+
+ wb.cancel()
+ <-wb.donec
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcasts.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcasts.go
new file mode 100644
index 0000000..8fe9e5f
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcasts.go
@@ -0,0 +1,135 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "sync"
+)
+
+type watchBroadcasts struct {
+ wp *watchProxy
+
+ // mu protects bcasts and watchers from the coalesce loop.
+ mu sync.Mutex
+ bcasts map[*watchBroadcast]struct{}
+ watchers map[*watcher]*watchBroadcast
+
+ updatec chan *watchBroadcast
+ donec chan struct{}
+}
+
+// maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced.
+const maxCoalesceReceivers = 5
+
+func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts {
+ wbs := &watchBroadcasts{
+ wp: wp,
+ bcasts: make(map[*watchBroadcast]struct{}),
+ watchers: make(map[*watcher]*watchBroadcast),
+ updatec: make(chan *watchBroadcast, 1),
+ donec: make(chan struct{}),
+ }
+ go func() {
+ defer close(wbs.donec)
+ for wb := range wbs.updatec {
+ wbs.coalesce(wb)
+ }
+ }()
+ return wbs
+}
+
+func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
+ if wb.size() >= maxCoalesceReceivers {
+ return
+ }
+ wbs.mu.Lock()
+ for wbswb := range wbs.bcasts {
+ if wbswb == wb {
+ continue
+ }
+ wb.mu.Lock()
+ wbswb.mu.Lock()
+ // 1. check if wbswb is behind wb so it won't skip any events in wb
+ // 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting
+ // for a current watcher and expects a create event from the server.
+ if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 {
+ for w := range wb.receivers {
+ wbswb.receivers[w] = struct{}{}
+ wbs.watchers[w] = wbswb
+ }
+ wb.receivers = nil
+ }
+ wbswb.mu.Unlock()
+ wb.mu.Unlock()
+ if wb.empty() {
+ delete(wbs.bcasts, wb)
+ wb.stop()
+ break
+ }
+ }
+ wbs.mu.Unlock()
+}
+
+func (wbs *watchBroadcasts) add(w *watcher) {
+ wbs.mu.Lock()
+ defer wbs.mu.Unlock()
+ // find fitting bcast
+ for wb := range wbs.bcasts {
+ if wb.add(w) {
+ wbs.watchers[w] = wb
+ return
+ }
+ }
+ // no fit; create a bcast
+ wb := newWatchBroadcast(wbs.wp, w, wbs.update)
+ wbs.watchers[w] = wb
+ wbs.bcasts[wb] = struct{}{}
+}
+
+// delete removes a watcher and returns the number of remaining watchers.
+func (wbs *watchBroadcasts) delete(w *watcher) int {
+ wbs.mu.Lock()
+ defer wbs.mu.Unlock()
+
+ wb, ok := wbs.watchers[w]
+ if !ok {
+ panic("deleting missing watcher from broadcasts")
+ }
+ delete(wbs.watchers, w)
+ wb.delete(w)
+ if wb.empty() {
+ delete(wbs.bcasts, wb)
+ wb.stop()
+ }
+ return len(wbs.bcasts)
+}
+
+func (wbs *watchBroadcasts) stop() {
+ wbs.mu.Lock()
+ for wb := range wbs.bcasts {
+ wb.stop()
+ }
+ wbs.bcasts = nil
+ close(wbs.updatec)
+ wbs.mu.Unlock()
+ <-wbs.donec
+}
+
+func (wbs *watchBroadcasts) update(wb *watchBroadcast) {
+ select {
+ case wbs.updatec <- wb:
+ default:
+ }
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_ranges.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_ranges.go
new file mode 100644
index 0000000..31c6b59
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_ranges.go
@@ -0,0 +1,69 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "sync"
+)
+
+// watchRanges tracks all open watches for the proxy.
+type watchRanges struct {
+ wp *watchProxy
+
+ mu sync.Mutex
+ bcasts map[watchRange]*watchBroadcasts
+}
+
+func newWatchRanges(wp *watchProxy) *watchRanges {
+ return &watchRanges{
+ wp: wp,
+ bcasts: make(map[watchRange]*watchBroadcasts),
+ }
+}
+
+func (wrs *watchRanges) add(w *watcher) {
+ wrs.mu.Lock()
+ defer wrs.mu.Unlock()
+
+ if wbs := wrs.bcasts[w.wr]; wbs != nil {
+ wbs.add(w)
+ return
+ }
+ wbs := newWatchBroadcasts(wrs.wp)
+ wrs.bcasts[w.wr] = wbs
+ wbs.add(w)
+}
+
+func (wrs *watchRanges) delete(w *watcher) {
+ wrs.mu.Lock()
+ defer wrs.mu.Unlock()
+ wbs, ok := wrs.bcasts[w.wr]
+ if !ok {
+ panic("deleting missing range")
+ }
+ if wbs.delete(w) == 0 {
+ wbs.stop()
+ delete(wrs.bcasts, w.wr)
+ }
+}
+
+func (wrs *watchRanges) stop() {
+ wrs.mu.Lock()
+ defer wrs.mu.Unlock()
+ for _, wb := range wrs.bcasts {
+ wb.stop()
+ }
+ wrs.bcasts = nil
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/watcher.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watcher.go
new file mode 100644
index 0000000..1a49746
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watcher.go
@@ -0,0 +1,129 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "time"
+
+ "github.com/coreos/etcd/clientv3"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "github.com/coreos/etcd/mvcc"
+ "github.com/coreos/etcd/mvcc/mvccpb"
+)
+
+type watchRange struct {
+ key, end string
+}
+
+func (wr *watchRange) valid() bool {
+ return len(wr.end) == 0 || wr.end > wr.key || (wr.end[0] == 0 && len(wr.end) == 1)
+}
+
+type watcher struct {
+ // user configuration
+
+ wr watchRange
+ filters []mvcc.FilterFunc
+ progress bool
+ prevKV bool
+
+ // id is the id returned to the client on its watch stream.
+ id int64
+ // nextrev is the minimum expected next event revision.
+ nextrev int64
+ // lastHeader has the last header sent over the stream.
+ lastHeader pb.ResponseHeader
+
+ // wps is the parent.
+ wps *watchProxyStream
+}
+
+// send filters out repeated events by discarding revisions older
+// than the last one sent over the watch channel.
+func (w *watcher) send(wr clientv3.WatchResponse) {
+ if wr.IsProgressNotify() && !w.progress {
+ return
+ }
+ if w.nextrev > wr.Header.Revision && len(wr.Events) > 0 {
+ return
+ }
+ if w.nextrev == 0 {
+ // current watch; expect updates following this revision
+ w.nextrev = wr.Header.Revision + 1
+ }
+
+ events := make([]*mvccpb.Event, 0, len(wr.Events))
+
+ var lastRev int64
+ for i := range wr.Events {
+ ev := (*mvccpb.Event)(wr.Events[i])
+ if ev.Kv.ModRevision < w.nextrev {
+ continue
+ } else {
+ // We cannot update w.rev here.
+ // txn can have multiple events with the same rev.
+ // If w.nextrev updates here, it would skip events in the same txn.
+ lastRev = ev.Kv.ModRevision
+ }
+
+ filtered := false
+ for _, filter := range w.filters {
+ if filter(*ev) {
+ filtered = true
+ break
+ }
+ }
+ if filtered {
+ continue
+ }
+
+ if !w.prevKV {
+ evCopy := *ev
+ evCopy.PrevKv = nil
+ ev = &evCopy
+ }
+ events = append(events, ev)
+ }
+
+ if lastRev >= w.nextrev {
+ w.nextrev = lastRev + 1
+ }
+
+ // all events are filtered out?
+ if !wr.IsProgressNotify() && !wr.Created && len(events) == 0 && wr.CompactRevision == 0 {
+ return
+ }
+
+ w.lastHeader = wr.Header
+ w.post(&pb.WatchResponse{
+ Header: &wr.Header,
+ Created: wr.Created,
+ CompactRevision: wr.CompactRevision,
+ Canceled: wr.Canceled,
+ WatchId: w.id,
+ Events: events,
+ })
+}
+
+// post puts a watch response on the watcher's proxy stream channel
+func (w *watcher) post(wr *pb.WatchResponse) bool {
+ select {
+ case w.wps.watchCh <- wr:
+ case <-time.After(50 * time.Millisecond):
+ w.wps.cancel()
+ return false
+ }
+ return true
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/httpproxy/director.go b/vendor/github.com/coreos/etcd/proxy/httpproxy/director.go
new file mode 100644
index 0000000..d414501
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/httpproxy/director.go
@@ -0,0 +1,158 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package httpproxy
+
+import (
+ "math/rand"
+ "net/url"
+ "sync"
+ "time"
+)
+
+// defaultRefreshInterval is the default proxyRefreshIntervalMs value
+// as in etcdmain/config.go.
+const defaultRefreshInterval = 30000 * time.Millisecond
+
+var once sync.Once
+
+func init() {
+ rand.Seed(time.Now().UnixNano())
+}
+
+func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
+ d := &director{
+ uf: urlsFunc,
+ failureWait: failureWait,
+ }
+ d.refresh()
+ go func() {
+ // In order to prevent missing proxy endpoints in the first try:
+ // when given refresh interval of defaultRefreshInterval or greater
+ // and whenever there is no available proxy endpoints,
+ // give 1-second refreshInterval.
+ for {
+ es := d.endpoints()
+ ri := refreshInterval
+ if ri >= defaultRefreshInterval {
+ if len(es) == 0 {
+ ri = time.Second
+ }
+ }
+ if len(es) > 0 {
+ once.Do(func() {
+ var sl []string
+ for _, e := range es {
+ sl = append(sl, e.URL.String())
+ }
+ plog.Infof("endpoints found %q", sl)
+ })
+ }
+ time.Sleep(ri)
+ d.refresh()
+ }
+ }()
+ return d
+}
+
+type director struct {
+ sync.Mutex
+ ep []*endpoint
+ uf GetProxyURLs
+ failureWait time.Duration
+}
+
+func (d *director) refresh() {
+ urls := d.uf()
+ d.Lock()
+ defer d.Unlock()
+ var endpoints []*endpoint
+ for _, u := range urls {
+ uu, err := url.Parse(u)
+ if err != nil {
+ plog.Printf("upstream URL invalid: %v", err)
+ continue
+ }
+ endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
+ }
+
+ // shuffle array to avoid connections being "stuck" to a single endpoint
+ for i := range endpoints {
+ j := rand.Intn(i + 1)
+ endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
+ }
+
+ d.ep = endpoints
+}
+
+func (d *director) endpoints() []*endpoint {
+ d.Lock()
+ defer d.Unlock()
+ filtered := make([]*endpoint, 0)
+ for _, ep := range d.ep {
+ if ep.Available {
+ filtered = append(filtered, ep)
+ }
+ }
+
+ return filtered
+}
+
+func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
+ ep := endpoint{
+ URL: u,
+ Available: true,
+ failFunc: timedUnavailabilityFunc(failureWait),
+ }
+
+ return &ep
+}
+
+type endpoint struct {
+ sync.Mutex
+
+ URL url.URL
+ Available bool
+
+ failFunc func(ep *endpoint)
+}
+
+func (ep *endpoint) Failed() {
+ ep.Lock()
+ if !ep.Available {
+ ep.Unlock()
+ return
+ }
+
+ ep.Available = false
+ ep.Unlock()
+
+ plog.Printf("marked endpoint %s unavailable", ep.URL.String())
+
+ if ep.failFunc == nil {
+ plog.Printf("no failFunc defined, endpoint %s will be unavailable forever.", ep.URL.String())
+ return
+ }
+
+ ep.failFunc(ep)
+}
+
+func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
+ return func(ep *endpoint) {
+ time.AfterFunc(wait, func() {
+ ep.Available = true
+ plog.Printf("marked endpoint %s available, to retest connectivity", ep.URL.String())
+ })
+ }
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/httpproxy/doc.go b/vendor/github.com/coreos/etcd/proxy/httpproxy/doc.go
new file mode 100644
index 0000000..7a45099
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/httpproxy/doc.go
@@ -0,0 +1,18 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package httpproxy implements etcd httpproxy. The etcd proxy acts as a reverse
+// http proxy forwarding client requests to active etcd cluster members, and does
+// not participate in consensus.
+package httpproxy
diff --git a/vendor/github.com/coreos/etcd/proxy/httpproxy/metrics.go b/vendor/github.com/coreos/etcd/proxy/httpproxy/metrics.go
new file mode 100644
index 0000000..f71258c
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/httpproxy/metrics.go
@@ -0,0 +1,88 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package httpproxy
+
+import (
+ "net/http"
+ "strconv"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+ requestsIncoming = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "proxy",
+ Name: "requests_total",
+ Help: "Counter requests incoming by method.",
+ }, []string{"method"})
+
+ requestsHandled = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "proxy",
+ Name: "handled_total",
+ Help: "Counter of requests fully handled (by authoratitave servers)",
+ }, []string{"method", "code"})
+
+ requestsDropped = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "proxy",
+ Name: "dropped_total",
+ Help: "Counter of requests dropped on the proxy.",
+ }, []string{"method", "proxying_error"})
+
+ requestsHandlingTime = prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "proxy",
+ Name: "handling_duration_seconds",
+ Help: "Bucketed histogram of handling time of successful events (non-watches), by method " +
+ "(GET/PUT etc.).",
+ Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
+ }, []string{"method"})
+)
+
+type forwardingError string
+
+const (
+ zeroEndpoints forwardingError = "zero_endpoints"
+ failedSendingRequest forwardingError = "failed_sending_request"
+ failedGettingResponse forwardingError = "failed_getting_response"
+)
+
+func init() {
+ prometheus.MustRegister(requestsIncoming)
+ prometheus.MustRegister(requestsHandled)
+ prometheus.MustRegister(requestsDropped)
+ prometheus.MustRegister(requestsHandlingTime)
+}
+
+func reportIncomingRequest(request *http.Request) {
+ requestsIncoming.WithLabelValues(request.Method).Inc()
+}
+
+func reportRequestHandled(request *http.Request, response *http.Response, startTime time.Time) {
+ method := request.Method
+ requestsHandled.WithLabelValues(method, strconv.Itoa(response.StatusCode)).Inc()
+ requestsHandlingTime.WithLabelValues(method).Observe(time.Since(startTime).Seconds())
+}
+
+func reportRequestDropped(request *http.Request, err forwardingError) {
+ requestsDropped.WithLabelValues(request.Method, string(err)).Inc()
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/httpproxy/proxy.go b/vendor/github.com/coreos/etcd/proxy/httpproxy/proxy.go
new file mode 100644
index 0000000..3cd3161
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/httpproxy/proxy.go
@@ -0,0 +1,116 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package httpproxy
+
+import (
+ "encoding/json"
+ "net/http"
+ "strings"
+ "time"
+
+ "golang.org/x/net/http2"
+)
+
+const (
+ // DefaultMaxIdleConnsPerHost indicates the default maximum idle connection
+ // count maintained between proxy and each member. We set it to 128 to
+ // let proxy handle 128 concurrent requests in long term smoothly.
+ // If the number of concurrent requests is bigger than this value,
+ // proxy needs to create one new connection when handling each request in
+ // the delta, which is bad because the creation consumes resource and
+ // may eat up ephemeral ports.
+ DefaultMaxIdleConnsPerHost = 128
+)
+
+// GetProxyURLs is a function which should return the current set of URLs to
+// which client requests should be proxied. This function will be queried
+// periodically by the proxy Handler to refresh the set of available
+// backends.
+type GetProxyURLs func() []string
+
+// NewHandler creates a new HTTP handler, listening on the given transport,
+// which will proxy requests to an etcd cluster.
+// The handler will periodically update its view of the cluster.
+func NewHandler(t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler {
+ if t.TLSClientConfig != nil {
+ // Enable http2, see Issue 5033.
+ err := http2.ConfigureTransport(t)
+ if err != nil {
+ plog.Infof("Error enabling Transport HTTP/2 support: %v", err)
+ }
+ }
+
+ p := &reverseProxy{
+ director: newDirector(urlsFunc, failureWait, refreshInterval),
+ transport: t,
+ }
+
+ mux := http.NewServeMux()
+ mux.Handle("/", p)
+ mux.HandleFunc("/v2/config/local/proxy", p.configHandler)
+
+ return mux
+}
+
+// NewReadonlyHandler wraps the given HTTP handler to allow only GET requests
+func NewReadonlyHandler(hdlr http.Handler) http.Handler {
+ readonly := readonlyHandlerFunc(hdlr)
+ return http.HandlerFunc(readonly)
+}
+
+func readonlyHandlerFunc(next http.Handler) func(http.ResponseWriter, *http.Request) {
+ return func(w http.ResponseWriter, req *http.Request) {
+ if req.Method != "GET" {
+ w.WriteHeader(http.StatusNotImplemented)
+ return
+ }
+
+ next.ServeHTTP(w, req)
+ }
+}
+
+func (p *reverseProxy) configHandler(w http.ResponseWriter, r *http.Request) {
+ if !allowMethod(w, r.Method, "GET") {
+ return
+ }
+
+ eps := p.director.endpoints()
+ epstr := make([]string, len(eps))
+ for i, e := range eps {
+ epstr[i] = e.URL.String()
+ }
+
+ proxyConfig := struct {
+ Endpoints []string `json:"endpoints"`
+ }{
+ Endpoints: epstr,
+ }
+
+ json.NewEncoder(w).Encode(proxyConfig)
+}
+
+// allowMethod verifies that the given method is one of the allowed methods,
+// and if not, it writes an error to w. A boolean is returned indicating
+// whether or not the method is allowed.
+func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
+ for _, meth := range ms {
+ if m == meth {
+ return true
+ }
+ }
+ w.Header().Set("Allow", strings.Join(ms, ","))
+ http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+ return false
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/httpproxy/reverse.go b/vendor/github.com/coreos/etcd/proxy/httpproxy/reverse.go
new file mode 100644
index 0000000..2ecff3a
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/httpproxy/reverse.go
@@ -0,0 +1,208 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package httpproxy
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
+ "github.com/coreos/pkg/capnslog"
+)
+
+var (
+ plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "proxy/httpproxy")
+
+ // Hop-by-hop headers. These are removed when sent to the backend.
+ // http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
+ // This list of headers borrowed from stdlib httputil.ReverseProxy
+ singleHopHeaders = []string{
+ "Connection",
+ "Keep-Alive",
+ "Proxy-Authenticate",
+ "Proxy-Authorization",
+ "Te", // canonicalized version of "TE"
+ "Trailers",
+ "Transfer-Encoding",
+ "Upgrade",
+ }
+)
+
+func removeSingleHopHeaders(hdrs *http.Header) {
+ for _, h := range singleHopHeaders {
+ hdrs.Del(h)
+ }
+}
+
+type reverseProxy struct {
+ director *director
+ transport http.RoundTripper
+}
+
+func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) {
+ reportIncomingRequest(clientreq)
+ proxyreq := new(http.Request)
+ *proxyreq = *clientreq
+ startTime := time.Now()
+
+ var (
+ proxybody []byte
+ err error
+ )
+
+ if clientreq.Body != nil {
+ proxybody, err = ioutil.ReadAll(clientreq.Body)
+ if err != nil {
+ msg := fmt.Sprintf("failed to read request body: %v", err)
+ plog.Println(msg)
+ e := httptypes.NewHTTPError(http.StatusInternalServerError, "httpproxy: "+msg)
+ if we := e.WriteTo(rw); we != nil {
+ plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
+ }
+ return
+ }
+ }
+
+ // deep-copy the headers, as these will be modified below
+ proxyreq.Header = make(http.Header)
+ copyHeader(proxyreq.Header, clientreq.Header)
+
+ normalizeRequest(proxyreq)
+ removeSingleHopHeaders(&proxyreq.Header)
+ maybeSetForwardedFor(proxyreq)
+
+ endpoints := p.director.endpoints()
+ if len(endpoints) == 0 {
+ msg := "zero endpoints currently available"
+ reportRequestDropped(clientreq, zeroEndpoints)
+
+ // TODO: limit the rate of the error logging.
+ plog.Println(msg)
+ e := httptypes.NewHTTPError(http.StatusServiceUnavailable, "httpproxy: "+msg)
+ if we := e.WriteTo(rw); we != nil {
+ plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
+ }
+ return
+ }
+
+ var requestClosed int32
+ completeCh := make(chan bool, 1)
+ closeNotifier, ok := rw.(http.CloseNotifier)
+ ctx, cancel := context.WithCancel(context.Background())
+ proxyreq = proxyreq.WithContext(ctx)
+ defer cancel()
+ if ok {
+ closeCh := closeNotifier.CloseNotify()
+ go func() {
+ select {
+ case <-closeCh:
+ atomic.StoreInt32(&requestClosed, 1)
+ plog.Printf("client %v closed request prematurely", clientreq.RemoteAddr)
+ cancel()
+ case <-completeCh:
+ }
+ }()
+
+ defer func() {
+ completeCh <- true
+ }()
+ }
+
+ var res *http.Response
+
+ for _, ep := range endpoints {
+ if proxybody != nil {
+ proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
+ }
+ redirectRequest(proxyreq, ep.URL)
+
+ res, err = p.transport.RoundTrip(proxyreq)
+ if atomic.LoadInt32(&requestClosed) == 1 {
+ return
+ }
+ if err != nil {
+ reportRequestDropped(clientreq, failedSendingRequest)
+ plog.Printf("failed to direct request to %s: %v", ep.URL.String(), err)
+ ep.Failed()
+ continue
+ }
+
+ break
+ }
+
+ if res == nil {
+ // TODO: limit the rate of the error logging.
+ msg := fmt.Sprintf("unable to get response from %d endpoint(s)", len(endpoints))
+ reportRequestDropped(clientreq, failedGettingResponse)
+ plog.Println(msg)
+ e := httptypes.NewHTTPError(http.StatusBadGateway, "httpproxy: "+msg)
+ if we := e.WriteTo(rw); we != nil {
+ plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
+ }
+ return
+ }
+
+ defer res.Body.Close()
+ reportRequestHandled(clientreq, res, startTime)
+ removeSingleHopHeaders(&res.Header)
+ copyHeader(rw.Header(), res.Header)
+
+ rw.WriteHeader(res.StatusCode)
+ io.Copy(rw, res.Body)
+}
+
+func copyHeader(dst, src http.Header) {
+ for k, vv := range src {
+ for _, v := range vv {
+ dst.Add(k, v)
+ }
+ }
+}
+
+func redirectRequest(req *http.Request, loc url.URL) {
+ req.URL.Scheme = loc.Scheme
+ req.URL.Host = loc.Host
+}
+
+func normalizeRequest(req *http.Request) {
+ req.Proto = "HTTP/1.1"
+ req.ProtoMajor = 1
+ req.ProtoMinor = 1
+ req.Close = false
+}
+
+func maybeSetForwardedFor(req *http.Request) {
+ clientIP, _, err := net.SplitHostPort(req.RemoteAddr)
+ if err != nil {
+ return
+ }
+
+ // If we aren't the first proxy retain prior
+ // X-Forwarded-For information as a comma+space
+ // separated list and fold multiple headers into one.
+ if prior, ok := req.Header["X-Forwarded-For"]; ok {
+ clientIP = strings.Join(prior, ", ") + ", " + clientIP
+ }
+ req.Header.Set("X-Forwarded-For", clientIP)
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/tcpproxy/doc.go b/vendor/github.com/coreos/etcd/proxy/tcpproxy/doc.go
new file mode 100644
index 0000000..6889cac
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/tcpproxy/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package tcpproxy is an OSI level 4 proxy for routing etcd clients to etcd servers.
+package tcpproxy
diff --git a/vendor/github.com/coreos/etcd/proxy/tcpproxy/userspace.go b/vendor/github.com/coreos/etcd/proxy/tcpproxy/userspace.go
new file mode 100644
index 0000000..6dc1d1d
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/tcpproxy/userspace.go
@@ -0,0 +1,226 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tcpproxy
+
+import (
+ "fmt"
+ "io"
+ "math/rand"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/coreos/pkg/capnslog"
+)
+
+var (
+ plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "proxy/tcpproxy")
+)
+
+type remote struct {
+ mu sync.Mutex
+ srv *net.SRV
+ addr string
+ inactive bool
+}
+
+func (r *remote) inactivate() {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.inactive = true
+}
+
+func (r *remote) tryReactivate() error {
+ conn, err := net.Dial("tcp", r.addr)
+ if err != nil {
+ return err
+ }
+ conn.Close()
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.inactive = false
+ return nil
+}
+
+func (r *remote) isActive() bool {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ return !r.inactive
+}
+
+type TCPProxy struct {
+ Listener net.Listener
+ Endpoints []*net.SRV
+ MonitorInterval time.Duration
+
+ donec chan struct{}
+
+ mu sync.Mutex // guards the following fields
+ remotes []*remote
+ pickCount int // for round robin
+}
+
+func (tp *TCPProxy) Run() error {
+ tp.donec = make(chan struct{})
+ if tp.MonitorInterval == 0 {
+ tp.MonitorInterval = 5 * time.Minute
+ }
+ for _, srv := range tp.Endpoints {
+ addr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
+ tp.remotes = append(tp.remotes, &remote{srv: srv, addr: addr})
+ }
+
+ eps := []string{}
+ for _, ep := range tp.Endpoints {
+ eps = append(eps, fmt.Sprintf("%s:%d", ep.Target, ep.Port))
+ }
+ plog.Printf("ready to proxy client requests to %+v", eps)
+
+ go tp.runMonitor()
+ for {
+ in, err := tp.Listener.Accept()
+ if err != nil {
+ return err
+ }
+
+ go tp.serve(in)
+ }
+}
+
+func (tp *TCPProxy) pick() *remote {
+ var weighted []*remote
+ var unweighted []*remote
+
+ bestPr := uint16(65535)
+ w := 0
+ // find best priority class
+ for _, r := range tp.remotes {
+ switch {
+ case !r.isActive():
+ case r.srv.Priority < bestPr:
+ bestPr = r.srv.Priority
+ w = 0
+ weighted = nil
+ unweighted = []*remote{r}
+ fallthrough
+ case r.srv.Priority == bestPr:
+ if r.srv.Weight > 0 {
+ weighted = append(weighted, r)
+ w += int(r.srv.Weight)
+ } else {
+ unweighted = append(unweighted, r)
+ }
+ }
+ }
+ if weighted != nil {
+ if len(unweighted) > 0 && rand.Intn(100) == 1 {
+ // In the presence of records containing weights greater
+ // than 0, records with weight 0 should have a very small
+ // chance of being selected.
+ r := unweighted[tp.pickCount%len(unweighted)]
+ tp.pickCount++
+ return r
+ }
+ // choose a uniform random number between 0 and the sum computed
+ // (inclusive), and select the RR whose running sum value is the
+ // first in the selected order
+ choose := rand.Intn(w)
+ for i := 0; i < len(weighted); i++ {
+ choose -= int(weighted[i].srv.Weight)
+ if choose <= 0 {
+ return weighted[i]
+ }
+ }
+ }
+ if unweighted != nil {
+ for i := 0; i < len(tp.remotes); i++ {
+ picked := tp.remotes[tp.pickCount%len(tp.remotes)]
+ tp.pickCount++
+ if picked.isActive() {
+ return picked
+ }
+ }
+ }
+ return nil
+}
+
+func (tp *TCPProxy) serve(in net.Conn) {
+ var (
+ err error
+ out net.Conn
+ )
+
+ for {
+ tp.mu.Lock()
+ remote := tp.pick()
+ tp.mu.Unlock()
+ if remote == nil {
+ break
+ }
+ // TODO: add timeout
+ out, err = net.Dial("tcp", remote.addr)
+ if err == nil {
+ break
+ }
+ remote.inactivate()
+ plog.Warningf("deactivated endpoint [%s] due to %v for %v", remote.addr, err, tp.MonitorInterval)
+ }
+
+ if out == nil {
+ in.Close()
+ return
+ }
+
+ go func() {
+ io.Copy(in, out)
+ in.Close()
+ out.Close()
+ }()
+
+ io.Copy(out, in)
+ out.Close()
+ in.Close()
+}
+
+func (tp *TCPProxy) runMonitor() {
+ for {
+ select {
+ case <-time.After(tp.MonitorInterval):
+ tp.mu.Lock()
+ for _, rem := range tp.remotes {
+ if rem.isActive() {
+ continue
+ }
+ go func(r *remote) {
+ if err := r.tryReactivate(); err != nil {
+ plog.Warningf("failed to activate endpoint [%s] due to %v (stay inactive for another %v)", r.addr, err, tp.MonitorInterval)
+ } else {
+ plog.Printf("activated %s", r.addr)
+ }
+ }(rem)
+ }
+ tp.mu.Unlock()
+ case <-tp.donec:
+ return
+ }
+ }
+}
+
+func (tp *TCPProxy) Stop() {
+ // graceful shutdown?
+ // shutdown current connections?
+ tp.Listener.Close()
+ close(tp.donec)
+}