Stephane Barbarie | 260a563 | 2019-02-26 16:12:49 -0500 | [diff] [blame] | 1 | // Copyright 2018 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | // Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<id>/<endpoint>'. |
| 16 | package endpoint |
| 17 | |
| 18 | import ( |
| 19 | "fmt" |
| 20 | "net/url" |
| 21 | "strings" |
| 22 | "sync" |
| 23 | |
| 24 | "google.golang.org/grpc/resolver" |
| 25 | ) |
| 26 | |
| 27 | const scheme = "endpoint" |
| 28 | |
| 29 | var ( |
| 30 | targetPrefix = fmt.Sprintf("%s://", scheme) |
| 31 | |
| 32 | bldr *builder |
| 33 | ) |
| 34 | |
| 35 | func init() { |
| 36 | bldr = &builder{ |
| 37 | resolverGroups: make(map[string]*ResolverGroup), |
| 38 | } |
| 39 | resolver.Register(bldr) |
| 40 | } |
| 41 | |
| 42 | type builder struct { |
| 43 | mu sync.RWMutex |
| 44 | resolverGroups map[string]*ResolverGroup |
| 45 | } |
| 46 | |
| 47 | // NewResolverGroup creates a new ResolverGroup with the given id. |
| 48 | func NewResolverGroup(id string) (*ResolverGroup, error) { |
| 49 | return bldr.newResolverGroup(id) |
| 50 | } |
| 51 | |
| 52 | // ResolverGroup keeps all endpoints of resolvers using a common endpoint://<id>/ target |
| 53 | // up-to-date. |
| 54 | type ResolverGroup struct { |
| 55 | mu sync.RWMutex |
| 56 | id string |
| 57 | endpoints []string |
| 58 | resolvers []*Resolver |
| 59 | } |
| 60 | |
| 61 | func (e *ResolverGroup) addResolver(r *Resolver) { |
| 62 | e.mu.Lock() |
| 63 | addrs := epsToAddrs(e.endpoints...) |
| 64 | e.resolvers = append(e.resolvers, r) |
| 65 | e.mu.Unlock() |
| 66 | r.cc.NewAddress(addrs) |
| 67 | } |
| 68 | |
| 69 | func (e *ResolverGroup) removeResolver(r *Resolver) { |
| 70 | e.mu.Lock() |
| 71 | for i, er := range e.resolvers { |
| 72 | if er == r { |
| 73 | e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...) |
| 74 | break |
| 75 | } |
| 76 | } |
| 77 | e.mu.Unlock() |
| 78 | } |
| 79 | |
| 80 | // SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated |
| 81 | // immediately with the new endpoints. |
| 82 | func (e *ResolverGroup) SetEndpoints(endpoints []string) { |
| 83 | addrs := epsToAddrs(endpoints...) |
| 84 | e.mu.Lock() |
| 85 | e.endpoints = endpoints |
| 86 | for _, r := range e.resolvers { |
| 87 | r.cc.NewAddress(addrs) |
| 88 | } |
| 89 | e.mu.Unlock() |
| 90 | } |
| 91 | |
| 92 | // Target constructs a endpoint target using the endpoint id of the ResolverGroup. |
| 93 | func (e *ResolverGroup) Target(endpoint string) string { |
| 94 | return Target(e.id, endpoint) |
| 95 | } |
| 96 | |
| 97 | // Target constructs a endpoint resolver target. |
| 98 | func Target(id, endpoint string) string { |
| 99 | return fmt.Sprintf("%s://%s/%s", scheme, id, endpoint) |
| 100 | } |
| 101 | |
| 102 | // IsTarget checks if a given target string in an endpoint resolver target. |
| 103 | func IsTarget(target string) bool { |
| 104 | return strings.HasPrefix(target, "endpoint://") |
| 105 | } |
| 106 | |
| 107 | func (e *ResolverGroup) Close() { |
| 108 | bldr.close(e.id) |
| 109 | } |
| 110 | |
| 111 | // Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target. |
| 112 | func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { |
| 113 | if len(target.Authority) < 1 { |
| 114 | return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to") |
| 115 | } |
| 116 | id := target.Authority |
| 117 | es, err := b.getResolverGroup(id) |
| 118 | if err != nil { |
| 119 | return nil, fmt.Errorf("failed to build resolver: %v", err) |
| 120 | } |
| 121 | r := &Resolver{ |
| 122 | endpointID: id, |
| 123 | cc: cc, |
| 124 | } |
| 125 | es.addResolver(r) |
| 126 | return r, nil |
| 127 | } |
| 128 | |
| 129 | func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) { |
| 130 | b.mu.RLock() |
| 131 | _, ok := b.resolverGroups[id] |
| 132 | b.mu.RUnlock() |
| 133 | if ok { |
| 134 | return nil, fmt.Errorf("Endpoint already exists for id: %s", id) |
| 135 | } |
| 136 | |
| 137 | es := &ResolverGroup{id: id} |
| 138 | b.mu.Lock() |
| 139 | b.resolverGroups[id] = es |
| 140 | b.mu.Unlock() |
| 141 | return es, nil |
| 142 | } |
| 143 | |
| 144 | func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) { |
| 145 | b.mu.RLock() |
| 146 | es, ok := b.resolverGroups[id] |
| 147 | b.mu.RUnlock() |
| 148 | if !ok { |
| 149 | return nil, fmt.Errorf("ResolverGroup not found for id: %s", id) |
| 150 | } |
| 151 | return es, nil |
| 152 | } |
| 153 | |
| 154 | func (b *builder) close(id string) { |
| 155 | b.mu.Lock() |
| 156 | delete(b.resolverGroups, id) |
| 157 | b.mu.Unlock() |
| 158 | } |
| 159 | |
| 160 | func (b *builder) Scheme() string { |
| 161 | return scheme |
| 162 | } |
| 163 | |
| 164 | // Resolver provides a resolver for a single etcd cluster, identified by name. |
| 165 | type Resolver struct { |
| 166 | endpointID string |
| 167 | cc resolver.ClientConn |
| 168 | sync.RWMutex |
| 169 | } |
| 170 | |
| 171 | // TODO: use balancer.epsToAddrs |
| 172 | func epsToAddrs(eps ...string) (addrs []resolver.Address) { |
| 173 | addrs = make([]resolver.Address, 0, len(eps)) |
| 174 | for _, ep := range eps { |
| 175 | addrs = append(addrs, resolver.Address{Addr: ep}) |
| 176 | } |
| 177 | return addrs |
| 178 | } |
| 179 | |
| 180 | func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {} |
| 181 | |
| 182 | func (r *Resolver) Close() { |
| 183 | es, err := bldr.getResolverGroup(r.endpointID) |
| 184 | if err != nil { |
| 185 | return |
| 186 | } |
| 187 | es.removeResolver(r) |
| 188 | } |
| 189 | |
| 190 | // ParseEndpoint endpoint parses an endpoint of the form |
| 191 | // (http|https)://<host>*|(unix|unixs)://<path>) |
| 192 | // and returns a protocol ('tcp' or 'unix'), |
| 193 | // host (or filepath if a unix socket), |
| 194 | // scheme (http, https, unix, unixs). |
| 195 | func ParseEndpoint(endpoint string) (proto string, host string, scheme string) { |
| 196 | proto = "tcp" |
| 197 | host = endpoint |
| 198 | url, uerr := url.Parse(endpoint) |
| 199 | if uerr != nil || !strings.Contains(endpoint, "://") { |
| 200 | return proto, host, scheme |
| 201 | } |
| 202 | scheme = url.Scheme |
| 203 | |
| 204 | // strip scheme:// prefix since grpc dials by host |
| 205 | host = url.Host |
| 206 | switch url.Scheme { |
| 207 | case "http", "https": |
| 208 | case "unix", "unixs": |
| 209 | proto = "unix" |
| 210 | host = url.Host + url.Path |
| 211 | default: |
| 212 | proto, host = "", "" |
| 213 | } |
| 214 | return proto, host, scheme |
| 215 | } |
| 216 | |
| 217 | // ParseTarget parses a endpoint://<id>/<endpoint> string and returns the parsed id and endpoint. |
| 218 | // If the target is malformed, an error is returned. |
| 219 | func ParseTarget(target string) (string, string, error) { |
| 220 | noPrefix := strings.TrimPrefix(target, targetPrefix) |
| 221 | if noPrefix == target { |
| 222 | return "", "", fmt.Errorf("malformed target, %s prefix is required: %s", targetPrefix, target) |
| 223 | } |
| 224 | parts := strings.SplitN(noPrefix, "/", 2) |
| 225 | if len(parts) != 2 { |
| 226 | return "", "", fmt.Errorf("malformed target, expected %s://<id>/<endpoint>, but got %s", scheme, target) |
| 227 | } |
| 228 | return parts[0], parts[1], nil |
| 229 | } |
| 230 | |
| 231 | // ParseHostPort splits a "<host>:<port>" string into the host and port parts. |
| 232 | // The port part is optional. |
| 233 | func ParseHostPort(hostPort string) (host string, port string) { |
| 234 | parts := strings.SplitN(hostPort, ":", 2) |
| 235 | host = parts[0] |
| 236 | if len(parts) > 1 { |
| 237 | port = parts[1] |
| 238 | } |
| 239 | return host, port |
| 240 | } |