khenaidoo | ffe076b | 2019-01-15 16:08:08 -0500 | [diff] [blame^] | 1 | // Copyright 2016 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package grpcproxy |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "fmt" |
| 20 | "os" |
| 21 | "sync" |
| 22 | |
| 23 | "github.com/coreos/etcd/clientv3" |
| 24 | "github.com/coreos/etcd/clientv3/naming" |
| 25 | "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" |
| 26 | pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| 27 | |
| 28 | "golang.org/x/time/rate" |
| 29 | gnaming "google.golang.org/grpc/naming" |
| 30 | ) |
| 31 | |
| 32 | // allow maximum 1 retry per second |
| 33 | const resolveRetryRate = 1 |
| 34 | |
| 35 | type clusterProxy struct { |
| 36 | clus clientv3.Cluster |
| 37 | ctx context.Context |
| 38 | gr *naming.GRPCResolver |
| 39 | |
| 40 | // advertise client URL |
| 41 | advaddr string |
| 42 | prefix string |
| 43 | |
| 44 | umu sync.RWMutex |
| 45 | umap map[string]gnaming.Update |
| 46 | } |
| 47 | |
| 48 | // NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints. |
| 49 | // The returned channel is closed when there is grpc-proxy endpoint registered |
| 50 | // and the client's context is canceled so the 'register' loop returns. |
| 51 | func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) { |
| 52 | cp := &clusterProxy{ |
| 53 | clus: c.Cluster, |
| 54 | ctx: c.Ctx(), |
| 55 | gr: &naming.GRPCResolver{Client: c}, |
| 56 | |
| 57 | advaddr: advaddr, |
| 58 | prefix: prefix, |
| 59 | umap: make(map[string]gnaming.Update), |
| 60 | } |
| 61 | |
| 62 | donec := make(chan struct{}) |
| 63 | if advaddr != "" && prefix != "" { |
| 64 | go func() { |
| 65 | defer close(donec) |
| 66 | cp.resolve(prefix) |
| 67 | }() |
| 68 | return cp, donec |
| 69 | } |
| 70 | |
| 71 | close(donec) |
| 72 | return cp, donec |
| 73 | } |
| 74 | |
| 75 | func (cp *clusterProxy) resolve(prefix string) { |
| 76 | rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate) |
| 77 | for rm.Wait(cp.ctx) == nil { |
| 78 | wa, err := cp.gr.Resolve(prefix) |
| 79 | if err != nil { |
| 80 | plog.Warningf("failed to resolve %q (%v)", prefix, err) |
| 81 | continue |
| 82 | } |
| 83 | cp.monitor(wa) |
| 84 | } |
| 85 | } |
| 86 | |
| 87 | func (cp *clusterProxy) monitor(wa gnaming.Watcher) { |
| 88 | for cp.ctx.Err() == nil { |
| 89 | ups, err := wa.Next() |
| 90 | if err != nil { |
| 91 | plog.Warningf("clusterProxy watcher error (%v)", err) |
| 92 | if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() { |
| 93 | return |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | cp.umu.Lock() |
| 98 | for i := range ups { |
| 99 | switch ups[i].Op { |
| 100 | case gnaming.Add: |
| 101 | cp.umap[ups[i].Addr] = *ups[i] |
| 102 | case gnaming.Delete: |
| 103 | delete(cp.umap, ups[i].Addr) |
| 104 | } |
| 105 | } |
| 106 | cp.umu.Unlock() |
| 107 | } |
| 108 | } |
| 109 | |
| 110 | func (cp *clusterProxy) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) { |
| 111 | mresp, err := cp.clus.MemberAdd(ctx, r.PeerURLs) |
| 112 | if err != nil { |
| 113 | return nil, err |
| 114 | } |
| 115 | resp := (pb.MemberAddResponse)(*mresp) |
| 116 | return &resp, err |
| 117 | } |
| 118 | |
| 119 | func (cp *clusterProxy) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) { |
| 120 | mresp, err := cp.clus.MemberRemove(ctx, r.ID) |
| 121 | if err != nil { |
| 122 | return nil, err |
| 123 | } |
| 124 | resp := (pb.MemberRemoveResponse)(*mresp) |
| 125 | return &resp, err |
| 126 | } |
| 127 | |
| 128 | func (cp *clusterProxy) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) { |
| 129 | mresp, err := cp.clus.MemberUpdate(ctx, r.ID, r.PeerURLs) |
| 130 | if err != nil { |
| 131 | return nil, err |
| 132 | } |
| 133 | resp := (pb.MemberUpdateResponse)(*mresp) |
| 134 | return &resp, err |
| 135 | } |
| 136 | |
| 137 | func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) { |
| 138 | cp.umu.RLock() |
| 139 | defer cp.umu.RUnlock() |
| 140 | mbs := make([]*pb.Member, 0, len(cp.umap)) |
| 141 | for addr, upt := range cp.umap { |
| 142 | m, err := decodeMeta(fmt.Sprint(upt.Metadata)) |
| 143 | if err != nil { |
| 144 | return nil, err |
| 145 | } |
| 146 | mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{addr}}) |
| 147 | } |
| 148 | return mbs, nil |
| 149 | } |
| 150 | |
| 151 | // MemberList wraps member list API with following rules: |
| 152 | // - If 'advaddr' is not empty and 'prefix' is not empty, return registered member lists via resolver |
| 153 | // - If 'advaddr' is not empty and 'prefix' is not empty and registered grpc-proxy members haven't been fetched, return the 'advaddr' |
| 154 | // - If 'advaddr' is not empty and 'prefix' is empty, return 'advaddr' without forcing it to 'register' |
| 155 | // - If 'advaddr' is empty, forward to member list API |
| 156 | func (cp *clusterProxy) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) { |
| 157 | if cp.advaddr != "" { |
| 158 | if cp.prefix != "" { |
| 159 | mbs, err := cp.membersFromUpdates() |
| 160 | if err != nil { |
| 161 | return nil, err |
| 162 | } |
| 163 | if len(mbs) > 0 { |
| 164 | return &pb.MemberListResponse{Members: mbs}, nil |
| 165 | } |
| 166 | } |
| 167 | // prefix is empty or no grpc-proxy members haven't been registered |
| 168 | hostname, _ := os.Hostname() |
| 169 | return &pb.MemberListResponse{Members: []*pb.Member{{Name: hostname, ClientURLs: []string{cp.advaddr}}}}, nil |
| 170 | } |
| 171 | mresp, err := cp.clus.MemberList(ctx) |
| 172 | if err != nil { |
| 173 | return nil, err |
| 174 | } |
| 175 | resp := (pb.MemberListResponse)(*mresp) |
| 176 | return &resp, err |
| 177 | } |