khenaidoo | 2672188 | 2021-08-11 17:42:52 -0400 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package v2http |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "encoding/json" |
| 20 | "errors" |
| 21 | "fmt" |
| 22 | "io/ioutil" |
| 23 | "net/http" |
| 24 | "net/url" |
| 25 | "path" |
| 26 | "strconv" |
| 27 | "strings" |
| 28 | "time" |
| 29 | |
| 30 | etcdErr "github.com/coreos/etcd/error" |
| 31 | "github.com/coreos/etcd/etcdserver" |
| 32 | "github.com/coreos/etcd/etcdserver/api" |
| 33 | "github.com/coreos/etcd/etcdserver/api/etcdhttp" |
| 34 | "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" |
| 35 | "github.com/coreos/etcd/etcdserver/auth" |
| 36 | "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| 37 | "github.com/coreos/etcd/etcdserver/membership" |
| 38 | "github.com/coreos/etcd/etcdserver/stats" |
| 39 | "github.com/coreos/etcd/pkg/types" |
| 40 | "github.com/coreos/etcd/store" |
| 41 | |
| 42 | "github.com/jonboulle/clockwork" |
| 43 | ) |
| 44 | |
| 45 | const ( |
| 46 | authPrefix = "/v2/auth" |
| 47 | keysPrefix = "/v2/keys" |
| 48 | machinesPrefix = "/v2/machines" |
| 49 | membersPrefix = "/v2/members" |
| 50 | statsPrefix = "/v2/stats" |
| 51 | ) |
| 52 | |
| 53 | // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. |
| 54 | func NewClientHandler(server etcdserver.ServerPeer, timeout time.Duration) http.Handler { |
| 55 | mux := http.NewServeMux() |
| 56 | etcdhttp.HandleBasic(mux, server) |
| 57 | handleV2(mux, server, timeout) |
| 58 | return requestLogger(mux) |
| 59 | } |
| 60 | |
| 61 | func handleV2(mux *http.ServeMux, server etcdserver.ServerV2, timeout time.Duration) { |
| 62 | sec := auth.NewStore(server, timeout) |
| 63 | kh := &keysHandler{ |
| 64 | sec: sec, |
| 65 | server: server, |
| 66 | cluster: server.Cluster(), |
| 67 | timeout: timeout, |
| 68 | clientCertAuthEnabled: server.ClientCertAuthEnabled(), |
| 69 | } |
| 70 | |
| 71 | sh := &statsHandler{ |
| 72 | stats: server, |
| 73 | } |
| 74 | |
| 75 | mh := &membersHandler{ |
| 76 | sec: sec, |
| 77 | server: server, |
| 78 | cluster: server.Cluster(), |
| 79 | timeout: timeout, |
| 80 | clock: clockwork.NewRealClock(), |
| 81 | clientCertAuthEnabled: server.ClientCertAuthEnabled(), |
| 82 | } |
| 83 | |
| 84 | mah := &machinesHandler{cluster: server.Cluster()} |
| 85 | |
| 86 | sech := &authHandler{ |
| 87 | sec: sec, |
| 88 | cluster: server.Cluster(), |
| 89 | clientCertAuthEnabled: server.ClientCertAuthEnabled(), |
| 90 | } |
| 91 | mux.HandleFunc("/", http.NotFound) |
| 92 | mux.Handle(keysPrefix, kh) |
| 93 | mux.Handle(keysPrefix+"/", kh) |
| 94 | mux.HandleFunc(statsPrefix+"/store", sh.serveStore) |
| 95 | mux.HandleFunc(statsPrefix+"/self", sh.serveSelf) |
| 96 | mux.HandleFunc(statsPrefix+"/leader", sh.serveLeader) |
| 97 | mux.Handle(membersPrefix, mh) |
| 98 | mux.Handle(membersPrefix+"/", mh) |
| 99 | mux.Handle(machinesPrefix, mah) |
| 100 | handleAuth(mux, sech) |
| 101 | } |
| 102 | |
| 103 | type keysHandler struct { |
| 104 | sec auth.Store |
| 105 | server etcdserver.ServerV2 |
| 106 | cluster api.Cluster |
| 107 | timeout time.Duration |
| 108 | clientCertAuthEnabled bool |
| 109 | } |
| 110 | |
| 111 | func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 112 | if !allowMethod(w, r.Method, "HEAD", "GET", "PUT", "POST", "DELETE") { |
| 113 | return |
| 114 | } |
| 115 | |
| 116 | w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) |
| 117 | |
| 118 | ctx, cancel := context.WithTimeout(context.Background(), h.timeout) |
| 119 | defer cancel() |
| 120 | clock := clockwork.NewRealClock() |
| 121 | startTime := clock.Now() |
| 122 | rr, noValueOnSuccess, err := parseKeyRequest(r, clock) |
| 123 | if err != nil { |
| 124 | writeKeyError(w, err) |
| 125 | return |
| 126 | } |
| 127 | // The path must be valid at this point (we've parsed the request successfully). |
| 128 | if !hasKeyPrefixAccess(h.sec, r, r.URL.Path[len(keysPrefix):], rr.Recursive, h.clientCertAuthEnabled) { |
| 129 | writeKeyNoAuth(w) |
| 130 | return |
| 131 | } |
| 132 | if !rr.Wait { |
| 133 | reportRequestReceived(rr) |
| 134 | } |
| 135 | resp, err := h.server.Do(ctx, rr) |
| 136 | if err != nil { |
| 137 | err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix) |
| 138 | writeKeyError(w, err) |
| 139 | reportRequestFailed(rr, err) |
| 140 | return |
| 141 | } |
| 142 | switch { |
| 143 | case resp.Event != nil: |
| 144 | if err := writeKeyEvent(w, resp, noValueOnSuccess); err != nil { |
| 145 | // Should never be reached |
| 146 | plog.Errorf("error writing event (%v)", err) |
| 147 | } |
| 148 | reportRequestCompleted(rr, resp, startTime) |
| 149 | case resp.Watcher != nil: |
| 150 | ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout) |
| 151 | defer cancel() |
| 152 | handleKeyWatch(ctx, w, resp, rr.Stream) |
| 153 | default: |
| 154 | writeKeyError(w, errors.New("received response with no Event/Watcher!")) |
| 155 | } |
| 156 | } |
| 157 | |
| 158 | type machinesHandler struct { |
| 159 | cluster api.Cluster |
| 160 | } |
| 161 | |
| 162 | func (h *machinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 163 | if !allowMethod(w, r.Method, "GET", "HEAD") { |
| 164 | return |
| 165 | } |
| 166 | endpoints := h.cluster.ClientURLs() |
| 167 | w.Write([]byte(strings.Join(endpoints, ", "))) |
| 168 | } |
| 169 | |
| 170 | type membersHandler struct { |
| 171 | sec auth.Store |
| 172 | server etcdserver.ServerV2 |
| 173 | cluster api.Cluster |
| 174 | timeout time.Duration |
| 175 | clock clockwork.Clock |
| 176 | clientCertAuthEnabled bool |
| 177 | } |
| 178 | |
| 179 | func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 180 | if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") { |
| 181 | return |
| 182 | } |
| 183 | if !hasWriteRootAccess(h.sec, r, h.clientCertAuthEnabled) { |
| 184 | writeNoAuth(w, r) |
| 185 | return |
| 186 | } |
| 187 | w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) |
| 188 | |
| 189 | ctx, cancel := context.WithTimeout(context.Background(), h.timeout) |
| 190 | defer cancel() |
| 191 | |
| 192 | switch r.Method { |
| 193 | case "GET": |
| 194 | switch trimPrefix(r.URL.Path, membersPrefix) { |
| 195 | case "": |
| 196 | mc := newMemberCollection(h.cluster.Members()) |
| 197 | w.Header().Set("Content-Type", "application/json") |
| 198 | if err := json.NewEncoder(w).Encode(mc); err != nil { |
| 199 | plog.Warningf("failed to encode members response (%v)", err) |
| 200 | } |
| 201 | case "leader": |
| 202 | id := h.server.Leader() |
| 203 | if id == 0 { |
| 204 | writeError(w, r, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election")) |
| 205 | return |
| 206 | } |
| 207 | m := newMember(h.cluster.Member(id)) |
| 208 | w.Header().Set("Content-Type", "application/json") |
| 209 | if err := json.NewEncoder(w).Encode(m); err != nil { |
| 210 | plog.Warningf("failed to encode members response (%v)", err) |
| 211 | } |
| 212 | default: |
| 213 | writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, "Not found")) |
| 214 | } |
| 215 | case "POST": |
| 216 | req := httptypes.MemberCreateRequest{} |
| 217 | if ok := unmarshalRequest(r, &req, w); !ok { |
| 218 | return |
| 219 | } |
| 220 | now := h.clock.Now() |
| 221 | m := membership.NewMember("", req.PeerURLs, "", &now) |
| 222 | _, err := h.server.AddMember(ctx, *m) |
| 223 | switch { |
| 224 | case err == membership.ErrIDExists || err == membership.ErrPeerURLexists: |
| 225 | writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) |
| 226 | return |
| 227 | case err != nil: |
| 228 | plog.Errorf("error adding member %s (%v)", m.ID, err) |
| 229 | writeError(w, r, err) |
| 230 | return |
| 231 | } |
| 232 | res := newMember(m) |
| 233 | w.Header().Set("Content-Type", "application/json") |
| 234 | w.WriteHeader(http.StatusCreated) |
| 235 | if err := json.NewEncoder(w).Encode(res); err != nil { |
| 236 | plog.Warningf("failed to encode members response (%v)", err) |
| 237 | } |
| 238 | case "DELETE": |
| 239 | id, ok := getID(r.URL.Path, w) |
| 240 | if !ok { |
| 241 | return |
| 242 | } |
| 243 | _, err := h.server.RemoveMember(ctx, uint64(id)) |
| 244 | switch { |
| 245 | case err == membership.ErrIDRemoved: |
| 246 | writeError(w, r, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id))) |
| 247 | case err == membership.ErrIDNotFound: |
| 248 | writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) |
| 249 | case err != nil: |
| 250 | plog.Errorf("error removing member %s (%v)", id, err) |
| 251 | writeError(w, r, err) |
| 252 | default: |
| 253 | w.WriteHeader(http.StatusNoContent) |
| 254 | } |
| 255 | case "PUT": |
| 256 | id, ok := getID(r.URL.Path, w) |
| 257 | if !ok { |
| 258 | return |
| 259 | } |
| 260 | req := httptypes.MemberUpdateRequest{} |
| 261 | if ok := unmarshalRequest(r, &req, w); !ok { |
| 262 | return |
| 263 | } |
| 264 | m := membership.Member{ |
| 265 | ID: id, |
| 266 | RaftAttributes: membership.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, |
| 267 | } |
| 268 | _, err := h.server.UpdateMember(ctx, m) |
| 269 | switch { |
| 270 | case err == membership.ErrPeerURLexists: |
| 271 | writeError(w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) |
| 272 | case err == membership.ErrIDNotFound: |
| 273 | writeError(w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) |
| 274 | case err != nil: |
| 275 | plog.Errorf("error updating member %s (%v)", m.ID, err) |
| 276 | writeError(w, r, err) |
| 277 | default: |
| 278 | w.WriteHeader(http.StatusNoContent) |
| 279 | } |
| 280 | } |
| 281 | } |
| 282 | |
| 283 | type statsHandler struct { |
| 284 | stats stats.Stats |
| 285 | } |
| 286 | |
| 287 | func (h *statsHandler) serveStore(w http.ResponseWriter, r *http.Request) { |
| 288 | if !allowMethod(w, r.Method, "GET") { |
| 289 | return |
| 290 | } |
| 291 | w.Header().Set("Content-Type", "application/json") |
| 292 | w.Write(h.stats.StoreStats()) |
| 293 | } |
| 294 | |
| 295 | func (h *statsHandler) serveSelf(w http.ResponseWriter, r *http.Request) { |
| 296 | if !allowMethod(w, r.Method, "GET") { |
| 297 | return |
| 298 | } |
| 299 | w.Header().Set("Content-Type", "application/json") |
| 300 | w.Write(h.stats.SelfStats()) |
| 301 | } |
| 302 | |
| 303 | func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) { |
| 304 | if !allowMethod(w, r.Method, "GET") { |
| 305 | return |
| 306 | } |
| 307 | stats := h.stats.LeaderStats() |
| 308 | if stats == nil { |
| 309 | etcdhttp.WriteError(w, r, httptypes.NewHTTPError(http.StatusForbidden, "not current leader")) |
| 310 | return |
| 311 | } |
| 312 | w.Header().Set("Content-Type", "application/json") |
| 313 | w.Write(stats) |
| 314 | } |
| 315 | |
| 316 | // parseKeyRequest converts a received http.Request on keysPrefix to |
| 317 | // a server Request, performing validation of supplied fields as appropriate. |
| 318 | // If any validation fails, an empty Request and non-nil error is returned. |
| 319 | func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, bool, error) { |
| 320 | var noValueOnSuccess bool |
| 321 | emptyReq := etcdserverpb.Request{} |
| 322 | |
| 323 | err := r.ParseForm() |
| 324 | if err != nil { |
| 325 | return emptyReq, false, etcdErr.NewRequestError( |
| 326 | etcdErr.EcodeInvalidForm, |
| 327 | err.Error(), |
| 328 | ) |
| 329 | } |
| 330 | |
| 331 | if !strings.HasPrefix(r.URL.Path, keysPrefix) { |
| 332 | return emptyReq, false, etcdErr.NewRequestError( |
| 333 | etcdErr.EcodeInvalidForm, |
| 334 | "incorrect key prefix", |
| 335 | ) |
| 336 | } |
| 337 | p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):]) |
| 338 | |
| 339 | var pIdx, wIdx uint64 |
| 340 | if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil { |
| 341 | return emptyReq, false, etcdErr.NewRequestError( |
| 342 | etcdErr.EcodeIndexNaN, |
| 343 | `invalid value for "prevIndex"`, |
| 344 | ) |
| 345 | } |
| 346 | if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil { |
| 347 | return emptyReq, false, etcdErr.NewRequestError( |
| 348 | etcdErr.EcodeIndexNaN, |
| 349 | `invalid value for "waitIndex"`, |
| 350 | ) |
| 351 | } |
| 352 | |
| 353 | var rec, sort, wait, dir, quorum, stream bool |
| 354 | if rec, err = getBool(r.Form, "recursive"); err != nil { |
| 355 | return emptyReq, false, etcdErr.NewRequestError( |
| 356 | etcdErr.EcodeInvalidField, |
| 357 | `invalid value for "recursive"`, |
| 358 | ) |
| 359 | } |
| 360 | if sort, err = getBool(r.Form, "sorted"); err != nil { |
| 361 | return emptyReq, false, etcdErr.NewRequestError( |
| 362 | etcdErr.EcodeInvalidField, |
| 363 | `invalid value for "sorted"`, |
| 364 | ) |
| 365 | } |
| 366 | if wait, err = getBool(r.Form, "wait"); err != nil { |
| 367 | return emptyReq, false, etcdErr.NewRequestError( |
| 368 | etcdErr.EcodeInvalidField, |
| 369 | `invalid value for "wait"`, |
| 370 | ) |
| 371 | } |
| 372 | // TODO(jonboulle): define what parameters dir is/isn't compatible with? |
| 373 | if dir, err = getBool(r.Form, "dir"); err != nil { |
| 374 | return emptyReq, false, etcdErr.NewRequestError( |
| 375 | etcdErr.EcodeInvalidField, |
| 376 | `invalid value for "dir"`, |
| 377 | ) |
| 378 | } |
| 379 | if quorum, err = getBool(r.Form, "quorum"); err != nil { |
| 380 | return emptyReq, false, etcdErr.NewRequestError( |
| 381 | etcdErr.EcodeInvalidField, |
| 382 | `invalid value for "quorum"`, |
| 383 | ) |
| 384 | } |
| 385 | if stream, err = getBool(r.Form, "stream"); err != nil { |
| 386 | return emptyReq, false, etcdErr.NewRequestError( |
| 387 | etcdErr.EcodeInvalidField, |
| 388 | `invalid value for "stream"`, |
| 389 | ) |
| 390 | } |
| 391 | |
| 392 | if wait && r.Method != "GET" { |
| 393 | return emptyReq, false, etcdErr.NewRequestError( |
| 394 | etcdErr.EcodeInvalidField, |
| 395 | `"wait" can only be used with GET requests`, |
| 396 | ) |
| 397 | } |
| 398 | |
| 399 | pV := r.FormValue("prevValue") |
| 400 | if _, ok := r.Form["prevValue"]; ok && pV == "" { |
| 401 | return emptyReq, false, etcdErr.NewRequestError( |
| 402 | etcdErr.EcodePrevValueRequired, |
| 403 | `"prevValue" cannot be empty`, |
| 404 | ) |
| 405 | } |
| 406 | |
| 407 | if noValueOnSuccess, err = getBool(r.Form, "noValueOnSuccess"); err != nil { |
| 408 | return emptyReq, false, etcdErr.NewRequestError( |
| 409 | etcdErr.EcodeInvalidField, |
| 410 | `invalid value for "noValueOnSuccess"`, |
| 411 | ) |
| 412 | } |
| 413 | |
| 414 | // TTL is nullable, so leave it null if not specified |
| 415 | // or an empty string |
| 416 | var ttl *uint64 |
| 417 | if len(r.FormValue("ttl")) > 0 { |
| 418 | i, err := getUint64(r.Form, "ttl") |
| 419 | if err != nil { |
| 420 | return emptyReq, false, etcdErr.NewRequestError( |
| 421 | etcdErr.EcodeTTLNaN, |
| 422 | `invalid value for "ttl"`, |
| 423 | ) |
| 424 | } |
| 425 | ttl = &i |
| 426 | } |
| 427 | |
| 428 | // prevExist is nullable, so leave it null if not specified |
| 429 | var pe *bool |
| 430 | if _, ok := r.Form["prevExist"]; ok { |
| 431 | bv, err := getBool(r.Form, "prevExist") |
| 432 | if err != nil { |
| 433 | return emptyReq, false, etcdErr.NewRequestError( |
| 434 | etcdErr.EcodeInvalidField, |
| 435 | "invalid value for prevExist", |
| 436 | ) |
| 437 | } |
| 438 | pe = &bv |
| 439 | } |
| 440 | |
| 441 | // refresh is nullable, so leave it null if not specified |
| 442 | var refresh *bool |
| 443 | if _, ok := r.Form["refresh"]; ok { |
| 444 | bv, err := getBool(r.Form, "refresh") |
| 445 | if err != nil { |
| 446 | return emptyReq, false, etcdErr.NewRequestError( |
| 447 | etcdErr.EcodeInvalidField, |
| 448 | "invalid value for refresh", |
| 449 | ) |
| 450 | } |
| 451 | refresh = &bv |
| 452 | if refresh != nil && *refresh { |
| 453 | val := r.FormValue("value") |
| 454 | if _, ok := r.Form["value"]; ok && val != "" { |
| 455 | return emptyReq, false, etcdErr.NewRequestError( |
| 456 | etcdErr.EcodeRefreshValue, |
| 457 | `A value was provided on a refresh`, |
| 458 | ) |
| 459 | } |
| 460 | if ttl == nil { |
| 461 | return emptyReq, false, etcdErr.NewRequestError( |
| 462 | etcdErr.EcodeRefreshTTLRequired, |
| 463 | `No TTL value set`, |
| 464 | ) |
| 465 | } |
| 466 | } |
| 467 | } |
| 468 | |
| 469 | rr := etcdserverpb.Request{ |
| 470 | Method: r.Method, |
| 471 | Path: p, |
| 472 | Val: r.FormValue("value"), |
| 473 | Dir: dir, |
| 474 | PrevValue: pV, |
| 475 | PrevIndex: pIdx, |
| 476 | PrevExist: pe, |
| 477 | Wait: wait, |
| 478 | Since: wIdx, |
| 479 | Recursive: rec, |
| 480 | Sorted: sort, |
| 481 | Quorum: quorum, |
| 482 | Stream: stream, |
| 483 | } |
| 484 | |
| 485 | if pe != nil { |
| 486 | rr.PrevExist = pe |
| 487 | } |
| 488 | |
| 489 | if refresh != nil { |
| 490 | rr.Refresh = refresh |
| 491 | } |
| 492 | |
| 493 | // Null TTL is equivalent to unset Expiration |
| 494 | if ttl != nil { |
| 495 | expr := time.Duration(*ttl) * time.Second |
| 496 | rr.Expiration = clock.Now().Add(expr).UnixNano() |
| 497 | } |
| 498 | |
| 499 | return rr, noValueOnSuccess, nil |
| 500 | } |
| 501 | |
| 502 | // writeKeyEvent trims the prefix of key path in a single Event under |
| 503 | // StoreKeysPrefix, serializes it and writes the resulting JSON to the given |
| 504 | // ResponseWriter, along with the appropriate headers. |
| 505 | func writeKeyEvent(w http.ResponseWriter, resp etcdserver.Response, noValueOnSuccess bool) error { |
| 506 | ev := resp.Event |
| 507 | if ev == nil { |
| 508 | return errors.New("cannot write empty Event!") |
| 509 | } |
| 510 | w.Header().Set("Content-Type", "application/json") |
| 511 | w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) |
| 512 | w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index)) |
| 513 | w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term)) |
| 514 | |
| 515 | if ev.IsCreated() { |
| 516 | w.WriteHeader(http.StatusCreated) |
| 517 | } |
| 518 | |
| 519 | ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) |
| 520 | if noValueOnSuccess && |
| 521 | (ev.Action == store.Set || ev.Action == store.CompareAndSwap || |
| 522 | ev.Action == store.Create || ev.Action == store.Update) { |
| 523 | ev.Node = nil |
| 524 | ev.PrevNode = nil |
| 525 | } |
| 526 | return json.NewEncoder(w).Encode(ev) |
| 527 | } |
| 528 | |
| 529 | func writeKeyNoAuth(w http.ResponseWriter) { |
| 530 | e := etcdErr.NewError(etcdErr.EcodeUnauthorized, "Insufficient credentials", 0) |
| 531 | e.WriteTo(w) |
| 532 | } |
| 533 | |
| 534 | // writeKeyError logs and writes the given Error to the ResponseWriter. |
| 535 | // If Error is not an etcdErr, the error will be converted to an etcd error. |
| 536 | func writeKeyError(w http.ResponseWriter, err error) { |
| 537 | if err == nil { |
| 538 | return |
| 539 | } |
| 540 | switch e := err.(type) { |
| 541 | case *etcdErr.Error: |
| 542 | e.WriteTo(w) |
| 543 | default: |
| 544 | switch err { |
| 545 | case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: |
| 546 | mlog.MergeError(err) |
| 547 | default: |
| 548 | mlog.MergeErrorf("got unexpected response error (%v)", err) |
| 549 | } |
| 550 | ee := etcdErr.NewError(etcdErr.EcodeRaftInternal, err.Error(), 0) |
| 551 | ee.WriteTo(w) |
| 552 | } |
| 553 | } |
| 554 | |
| 555 | func handleKeyWatch(ctx context.Context, w http.ResponseWriter, resp etcdserver.Response, stream bool) { |
| 556 | wa := resp.Watcher |
| 557 | defer wa.Remove() |
| 558 | ech := wa.EventChan() |
| 559 | var nch <-chan bool |
| 560 | if x, ok := w.(http.CloseNotifier); ok { |
| 561 | nch = x.CloseNotify() |
| 562 | } |
| 563 | |
| 564 | w.Header().Set("Content-Type", "application/json") |
| 565 | w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex())) |
| 566 | w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index)) |
| 567 | w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term)) |
| 568 | w.WriteHeader(http.StatusOK) |
| 569 | |
| 570 | // Ensure headers are flushed early, in case of long polling |
| 571 | w.(http.Flusher).Flush() |
| 572 | |
| 573 | for { |
| 574 | select { |
| 575 | case <-nch: |
| 576 | // Client closed connection. Nothing to do. |
| 577 | return |
| 578 | case <-ctx.Done(): |
| 579 | // Timed out. net/http will close the connection for us, so nothing to do. |
| 580 | return |
| 581 | case ev, ok := <-ech: |
| 582 | if !ok { |
| 583 | // If the channel is closed this may be an indication of |
| 584 | // that notifications are much more than we are able to |
| 585 | // send to the client in time. Then we simply end streaming. |
| 586 | return |
| 587 | } |
| 588 | ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) |
| 589 | if err := json.NewEncoder(w).Encode(ev); err != nil { |
| 590 | // Should never be reached |
| 591 | plog.Warningf("error writing event (%v)", err) |
| 592 | return |
| 593 | } |
| 594 | if !stream { |
| 595 | return |
| 596 | } |
| 597 | w.(http.Flusher).Flush() |
| 598 | } |
| 599 | } |
| 600 | } |
| 601 | |
| 602 | func trimEventPrefix(ev *store.Event, prefix string) *store.Event { |
| 603 | if ev == nil { |
| 604 | return nil |
| 605 | } |
| 606 | // Since the *Event may reference one in the store history |
| 607 | // history, we must copy it before modifying |
| 608 | e := ev.Clone() |
| 609 | trimNodeExternPrefix(e.Node, prefix) |
| 610 | trimNodeExternPrefix(e.PrevNode, prefix) |
| 611 | return e |
| 612 | } |
| 613 | |
| 614 | func trimNodeExternPrefix(n *store.NodeExtern, prefix string) { |
| 615 | if n == nil { |
| 616 | return |
| 617 | } |
| 618 | n.Key = strings.TrimPrefix(n.Key, prefix) |
| 619 | for _, nn := range n.Nodes { |
| 620 | trimNodeExternPrefix(nn, prefix) |
| 621 | } |
| 622 | } |
| 623 | |
| 624 | func trimErrorPrefix(err error, prefix string) error { |
| 625 | if e, ok := err.(*etcdErr.Error); ok { |
| 626 | e.Cause = strings.TrimPrefix(e.Cause, prefix) |
| 627 | } |
| 628 | return err |
| 629 | } |
| 630 | |
| 631 | func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool { |
| 632 | ctype := r.Header.Get("Content-Type") |
| 633 | semicolonPosition := strings.Index(ctype, ";") |
| 634 | if semicolonPosition != -1 { |
| 635 | ctype = strings.TrimSpace(strings.ToLower(ctype[0:semicolonPosition])) |
| 636 | } |
| 637 | if ctype != "application/json" { |
| 638 | writeError(w, r, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) |
| 639 | return false |
| 640 | } |
| 641 | b, err := ioutil.ReadAll(r.Body) |
| 642 | if err != nil { |
| 643 | writeError(w, r, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) |
| 644 | return false |
| 645 | } |
| 646 | if err := req.UnmarshalJSON(b); err != nil { |
| 647 | writeError(w, r, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) |
| 648 | return false |
| 649 | } |
| 650 | return true |
| 651 | } |
| 652 | |
| 653 | func getID(p string, w http.ResponseWriter) (types.ID, bool) { |
| 654 | idStr := trimPrefix(p, membersPrefix) |
| 655 | if idStr == "" { |
| 656 | http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) |
| 657 | return 0, false |
| 658 | } |
| 659 | id, err := types.IDFromString(idStr) |
| 660 | if err != nil { |
| 661 | writeError(w, nil, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) |
| 662 | return 0, false |
| 663 | } |
| 664 | return id, true |
| 665 | } |
| 666 | |
| 667 | // getUint64 extracts a uint64 by the given key from a Form. If the key does |
| 668 | // not exist in the form, 0 is returned. If the key exists but the value is |
| 669 | // badly formed, an error is returned. If multiple values are present only the |
| 670 | // first is considered. |
| 671 | func getUint64(form url.Values, key string) (i uint64, err error) { |
| 672 | if vals, ok := form[key]; ok { |
| 673 | i, err = strconv.ParseUint(vals[0], 10, 64) |
| 674 | } |
| 675 | return |
| 676 | } |
| 677 | |
| 678 | // getBool extracts a bool by the given key from a Form. If the key does not |
| 679 | // exist in the form, false is returned. If the key exists but the value is |
| 680 | // badly formed, an error is returned. If multiple values are present only the |
| 681 | // first is considered. |
| 682 | func getBool(form url.Values, key string) (b bool, err error) { |
| 683 | if vals, ok := form[key]; ok { |
| 684 | b, err = strconv.ParseBool(vals[0]) |
| 685 | } |
| 686 | return |
| 687 | } |
| 688 | |
| 689 | // trimPrefix removes a given prefix and any slash following the prefix |
| 690 | // e.g.: trimPrefix("foo", "foo") == trimPrefix("foo/", "foo") == "" |
| 691 | func trimPrefix(p, prefix string) (s string) { |
| 692 | s = strings.TrimPrefix(p, prefix) |
| 693 | s = strings.TrimPrefix(s, "/") |
| 694 | return |
| 695 | } |
| 696 | |
| 697 | func newMemberCollection(ms []*membership.Member) *httptypes.MemberCollection { |
| 698 | c := httptypes.MemberCollection(make([]httptypes.Member, len(ms))) |
| 699 | |
| 700 | for i, m := range ms { |
| 701 | c[i] = newMember(m) |
| 702 | } |
| 703 | |
| 704 | return &c |
| 705 | } |
| 706 | |
| 707 | func newMember(m *membership.Member) httptypes.Member { |
| 708 | tm := httptypes.Member{ |
| 709 | ID: m.ID.String(), |
| 710 | Name: m.Name, |
| 711 | PeerURLs: make([]string, len(m.PeerURLs)), |
| 712 | ClientURLs: make([]string, len(m.ClientURLs)), |
| 713 | } |
| 714 | |
| 715 | copy(tm.PeerURLs, m.PeerURLs) |
| 716 | copy(tm.ClientURLs, m.ClientURLs) |
| 717 | |
| 718 | return tm |
| 719 | } |