khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package v2http |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "encoding/json" |
| 20 | "errors" |
| 21 | "fmt" |
| 22 | "io/ioutil" |
| 23 | "net/http" |
| 24 | "net/url" |
| 25 | "path" |
| 26 | "strconv" |
| 27 | "strings" |
| 28 | "time" |
| 29 | |
| 30 | "go.etcd.io/etcd/etcdserver" |
| 31 | "go.etcd.io/etcd/etcdserver/api" |
| 32 | "go.etcd.io/etcd/etcdserver/api/etcdhttp" |
| 33 | "go.etcd.io/etcd/etcdserver/api/membership" |
| 34 | "go.etcd.io/etcd/etcdserver/api/v2auth" |
| 35 | "go.etcd.io/etcd/etcdserver/api/v2error" |
| 36 | "go.etcd.io/etcd/etcdserver/api/v2http/httptypes" |
| 37 | stats "go.etcd.io/etcd/etcdserver/api/v2stats" |
| 38 | "go.etcd.io/etcd/etcdserver/api/v2store" |
| 39 | "go.etcd.io/etcd/etcdserver/etcdserverpb" |
| 40 | "go.etcd.io/etcd/pkg/types" |
| 41 | |
| 42 | "github.com/jonboulle/clockwork" |
| 43 | "go.uber.org/zap" |
| 44 | ) |
| 45 | |
| 46 | const ( |
| 47 | authPrefix = "/v2/auth" |
| 48 | keysPrefix = "/v2/keys" |
| 49 | machinesPrefix = "/v2/machines" |
| 50 | membersPrefix = "/v2/members" |
| 51 | statsPrefix = "/v2/stats" |
| 52 | ) |
| 53 | |
| 54 | // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. |
| 55 | func NewClientHandler(lg *zap.Logger, server etcdserver.ServerPeer, timeout time.Duration) http.Handler { |
| 56 | mux := http.NewServeMux() |
| 57 | etcdhttp.HandleBasic(mux, server) |
| 58 | handleV2(lg, mux, server, timeout) |
| 59 | return requestLogger(lg, mux) |
| 60 | } |
| 61 | |
| 62 | func handleV2(lg *zap.Logger, mux *http.ServeMux, server etcdserver.ServerV2, timeout time.Duration) { |
| 63 | sec := v2auth.NewStore(lg, server, timeout) |
| 64 | kh := &keysHandler{ |
| 65 | lg: lg, |
| 66 | sec: sec, |
| 67 | server: server, |
| 68 | cluster: server.Cluster(), |
| 69 | timeout: timeout, |
| 70 | clientCertAuthEnabled: server.ClientCertAuthEnabled(), |
| 71 | } |
| 72 | |
| 73 | sh := &statsHandler{ |
| 74 | lg: lg, |
| 75 | stats: server, |
| 76 | } |
| 77 | |
| 78 | mh := &membersHandler{ |
| 79 | lg: lg, |
| 80 | sec: sec, |
| 81 | server: server, |
| 82 | cluster: server.Cluster(), |
| 83 | timeout: timeout, |
| 84 | clock: clockwork.NewRealClock(), |
| 85 | clientCertAuthEnabled: server.ClientCertAuthEnabled(), |
| 86 | } |
| 87 | |
| 88 | mah := &machinesHandler{cluster: server.Cluster()} |
| 89 | |
| 90 | sech := &authHandler{ |
| 91 | lg: lg, |
| 92 | sec: sec, |
| 93 | cluster: server.Cluster(), |
| 94 | clientCertAuthEnabled: server.ClientCertAuthEnabled(), |
| 95 | } |
| 96 | mux.HandleFunc("/", http.NotFound) |
| 97 | mux.Handle(keysPrefix, kh) |
| 98 | mux.Handle(keysPrefix+"/", kh) |
| 99 | mux.HandleFunc(statsPrefix+"/store", sh.serveStore) |
| 100 | mux.HandleFunc(statsPrefix+"/self", sh.serveSelf) |
| 101 | mux.HandleFunc(statsPrefix+"/leader", sh.serveLeader) |
| 102 | mux.Handle(membersPrefix, mh) |
| 103 | mux.Handle(membersPrefix+"/", mh) |
| 104 | mux.Handle(machinesPrefix, mah) |
| 105 | handleAuth(mux, sech) |
| 106 | } |
| 107 | |
| 108 | type keysHandler struct { |
| 109 | lg *zap.Logger |
| 110 | sec v2auth.Store |
| 111 | server etcdserver.ServerV2 |
| 112 | cluster api.Cluster |
| 113 | timeout time.Duration |
| 114 | clientCertAuthEnabled bool |
| 115 | } |
| 116 | |
| 117 | func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 118 | if !allowMethod(w, r.Method, "HEAD", "GET", "PUT", "POST", "DELETE") { |
| 119 | return |
| 120 | } |
| 121 | |
| 122 | w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) |
| 123 | |
| 124 | ctx, cancel := context.WithTimeout(context.Background(), h.timeout) |
| 125 | defer cancel() |
| 126 | clock := clockwork.NewRealClock() |
| 127 | startTime := clock.Now() |
| 128 | rr, noValueOnSuccess, err := parseKeyRequest(r, clock) |
| 129 | if err != nil { |
| 130 | writeKeyError(h.lg, w, err) |
| 131 | return |
| 132 | } |
| 133 | // The path must be valid at this point (we've parsed the request successfully). |
| 134 | if !hasKeyPrefixAccess(h.lg, h.sec, r, r.URL.Path[len(keysPrefix):], rr.Recursive, h.clientCertAuthEnabled) { |
| 135 | writeKeyNoAuth(w) |
| 136 | return |
| 137 | } |
| 138 | if !rr.Wait { |
| 139 | reportRequestReceived(rr) |
| 140 | } |
| 141 | resp, err := h.server.Do(ctx, rr) |
| 142 | if err != nil { |
| 143 | err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix) |
| 144 | writeKeyError(h.lg, w, err) |
| 145 | reportRequestFailed(rr, err) |
| 146 | return |
| 147 | } |
| 148 | switch { |
| 149 | case resp.Event != nil: |
| 150 | if err := writeKeyEvent(w, resp, noValueOnSuccess); err != nil { |
| 151 | // Should never be reached |
| 152 | if h.lg != nil { |
| 153 | h.lg.Warn("failed to write key event", zap.Error(err)) |
| 154 | } else { |
| 155 | plog.Errorf("error writing event (%v)", err) |
| 156 | } |
| 157 | } |
| 158 | reportRequestCompleted(rr, startTime) |
| 159 | case resp.Watcher != nil: |
| 160 | ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout) |
| 161 | defer cancel() |
| 162 | handleKeyWatch(ctx, h.lg, w, resp, rr.Stream) |
| 163 | default: |
| 164 | writeKeyError(h.lg, w, errors.New("received response with no Event/Watcher")) |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | type machinesHandler struct { |
| 169 | cluster api.Cluster |
| 170 | } |
| 171 | |
| 172 | func (h *machinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 173 | if !allowMethod(w, r.Method, "GET", "HEAD") { |
| 174 | return |
| 175 | } |
| 176 | endpoints := h.cluster.ClientURLs() |
| 177 | w.Write([]byte(strings.Join(endpoints, ", "))) |
| 178 | } |
| 179 | |
| 180 | type membersHandler struct { |
| 181 | lg *zap.Logger |
| 182 | sec v2auth.Store |
| 183 | server etcdserver.ServerV2 |
| 184 | cluster api.Cluster |
| 185 | timeout time.Duration |
| 186 | clock clockwork.Clock |
| 187 | clientCertAuthEnabled bool |
| 188 | } |
| 189 | |
| 190 | func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| 191 | if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") { |
| 192 | return |
| 193 | } |
| 194 | if !hasWriteRootAccess(h.lg, h.sec, r, h.clientCertAuthEnabled) { |
| 195 | writeNoAuth(h.lg, w, r) |
| 196 | return |
| 197 | } |
| 198 | w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) |
| 199 | |
| 200 | ctx, cancel := context.WithTimeout(context.Background(), h.timeout) |
| 201 | defer cancel() |
| 202 | |
| 203 | switch r.Method { |
| 204 | case "GET": |
| 205 | switch trimPrefix(r.URL.Path, membersPrefix) { |
| 206 | case "": |
| 207 | mc := newMemberCollection(h.cluster.Members()) |
| 208 | w.Header().Set("Content-Type", "application/json") |
| 209 | if err := json.NewEncoder(w).Encode(mc); err != nil { |
| 210 | if h.lg != nil { |
| 211 | h.lg.Warn("failed to encode members response", zap.Error(err)) |
| 212 | } else { |
| 213 | plog.Warningf("failed to encode members response (%v)", err) |
| 214 | } |
| 215 | } |
| 216 | case "leader": |
| 217 | id := h.server.Leader() |
| 218 | if id == 0 { |
| 219 | writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election")) |
| 220 | return |
| 221 | } |
| 222 | m := newMember(h.cluster.Member(id)) |
| 223 | w.Header().Set("Content-Type", "application/json") |
| 224 | if err := json.NewEncoder(w).Encode(m); err != nil { |
| 225 | if h.lg != nil { |
| 226 | h.lg.Warn("failed to encode members response", zap.Error(err)) |
| 227 | } else { |
| 228 | plog.Warningf("failed to encode members response (%v)", err) |
| 229 | } |
| 230 | } |
| 231 | default: |
| 232 | writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusNotFound, "Not found")) |
| 233 | } |
| 234 | |
| 235 | case "POST": |
| 236 | req := httptypes.MemberCreateRequest{} |
| 237 | if ok := unmarshalRequest(h.lg, r, &req, w); !ok { |
| 238 | return |
| 239 | } |
| 240 | now := h.clock.Now() |
| 241 | m := membership.NewMember("", req.PeerURLs, "", &now) |
| 242 | _, err := h.server.AddMember(ctx, *m) |
| 243 | switch { |
| 244 | case err == membership.ErrIDExists || err == membership.ErrPeerURLexists: |
| 245 | writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) |
| 246 | return |
| 247 | case err != nil: |
| 248 | if h.lg != nil { |
| 249 | h.lg.Warn( |
| 250 | "failed to add a member", |
| 251 | zap.String("member-id", m.ID.String()), |
| 252 | zap.Error(err), |
| 253 | ) |
| 254 | } else { |
| 255 | plog.Errorf("error adding member %s (%v)", m.ID, err) |
| 256 | } |
| 257 | writeError(h.lg, w, r, err) |
| 258 | return |
| 259 | } |
| 260 | res := newMember(m) |
| 261 | w.Header().Set("Content-Type", "application/json") |
| 262 | w.WriteHeader(http.StatusCreated) |
| 263 | if err := json.NewEncoder(w).Encode(res); err != nil { |
| 264 | if h.lg != nil { |
| 265 | h.lg.Warn("failed to encode members response", zap.Error(err)) |
| 266 | } else { |
| 267 | plog.Warningf("failed to encode members response (%v)", err) |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | case "DELETE": |
| 272 | id, ok := getID(h.lg, r.URL.Path, w) |
| 273 | if !ok { |
| 274 | return |
| 275 | } |
| 276 | _, err := h.server.RemoveMember(ctx, uint64(id)) |
| 277 | switch { |
| 278 | case err == membership.ErrIDRemoved: |
| 279 | writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id))) |
| 280 | case err == membership.ErrIDNotFound: |
| 281 | writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) |
| 282 | case err != nil: |
| 283 | if h.lg != nil { |
| 284 | h.lg.Warn( |
| 285 | "failed to remove a member", |
| 286 | zap.String("member-id", id.String()), |
| 287 | zap.Error(err), |
| 288 | ) |
| 289 | } else { |
| 290 | plog.Errorf("error removing member %s (%v)", id, err) |
| 291 | } |
| 292 | writeError(h.lg, w, r, err) |
| 293 | default: |
| 294 | w.WriteHeader(http.StatusNoContent) |
| 295 | } |
| 296 | |
| 297 | case "PUT": |
| 298 | id, ok := getID(h.lg, r.URL.Path, w) |
| 299 | if !ok { |
| 300 | return |
| 301 | } |
| 302 | req := httptypes.MemberUpdateRequest{} |
| 303 | if ok := unmarshalRequest(h.lg, r, &req, w); !ok { |
| 304 | return |
| 305 | } |
| 306 | m := membership.Member{ |
| 307 | ID: id, |
| 308 | RaftAttributes: membership.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()}, |
| 309 | } |
| 310 | _, err := h.server.UpdateMember(ctx, m) |
| 311 | switch { |
| 312 | case err == membership.ErrPeerURLexists: |
| 313 | writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusConflict, err.Error())) |
| 314 | case err == membership.ErrIDNotFound: |
| 315 | writeError(h.lg, w, r, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id))) |
| 316 | case err != nil: |
| 317 | if h.lg != nil { |
| 318 | h.lg.Warn( |
| 319 | "failed to update a member", |
| 320 | zap.String("member-id", m.ID.String()), |
| 321 | zap.Error(err), |
| 322 | ) |
| 323 | } else { |
| 324 | plog.Errorf("error updating member %s (%v)", m.ID, err) |
| 325 | } |
| 326 | writeError(h.lg, w, r, err) |
| 327 | default: |
| 328 | w.WriteHeader(http.StatusNoContent) |
| 329 | } |
| 330 | } |
| 331 | } |
| 332 | |
| 333 | type statsHandler struct { |
| 334 | lg *zap.Logger |
| 335 | stats stats.Stats |
| 336 | } |
| 337 | |
| 338 | func (h *statsHandler) serveStore(w http.ResponseWriter, r *http.Request) { |
| 339 | if !allowMethod(w, r.Method, "GET") { |
| 340 | return |
| 341 | } |
| 342 | w.Header().Set("Content-Type", "application/json") |
| 343 | w.Write(h.stats.StoreStats()) |
| 344 | } |
| 345 | |
| 346 | func (h *statsHandler) serveSelf(w http.ResponseWriter, r *http.Request) { |
| 347 | if !allowMethod(w, r.Method, "GET") { |
| 348 | return |
| 349 | } |
| 350 | w.Header().Set("Content-Type", "application/json") |
| 351 | w.Write(h.stats.SelfStats()) |
| 352 | } |
| 353 | |
| 354 | func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) { |
| 355 | if !allowMethod(w, r.Method, "GET") { |
| 356 | return |
| 357 | } |
| 358 | stats := h.stats.LeaderStats() |
| 359 | if stats == nil { |
| 360 | etcdhttp.WriteError(h.lg, w, r, httptypes.NewHTTPError(http.StatusForbidden, "not current leader")) |
| 361 | return |
| 362 | } |
| 363 | w.Header().Set("Content-Type", "application/json") |
| 364 | w.Write(stats) |
| 365 | } |
| 366 | |
| 367 | // parseKeyRequest converts a received http.Request on keysPrefix to |
| 368 | // a server Request, performing validation of supplied fields as appropriate. |
| 369 | // If any validation fails, an empty Request and non-nil error is returned. |
| 370 | func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, bool, error) { |
| 371 | var noValueOnSuccess bool |
| 372 | emptyReq := etcdserverpb.Request{} |
| 373 | |
| 374 | err := r.ParseForm() |
| 375 | if err != nil { |
| 376 | return emptyReq, false, v2error.NewRequestError( |
| 377 | v2error.EcodeInvalidForm, |
| 378 | err.Error(), |
| 379 | ) |
| 380 | } |
| 381 | |
| 382 | if !strings.HasPrefix(r.URL.Path, keysPrefix) { |
| 383 | return emptyReq, false, v2error.NewRequestError( |
| 384 | v2error.EcodeInvalidForm, |
| 385 | "incorrect key prefix", |
| 386 | ) |
| 387 | } |
| 388 | p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):]) |
| 389 | |
| 390 | var pIdx, wIdx uint64 |
| 391 | if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil { |
| 392 | return emptyReq, false, v2error.NewRequestError( |
| 393 | v2error.EcodeIndexNaN, |
| 394 | `invalid value for "prevIndex"`, |
| 395 | ) |
| 396 | } |
| 397 | if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil { |
| 398 | return emptyReq, false, v2error.NewRequestError( |
| 399 | v2error.EcodeIndexNaN, |
| 400 | `invalid value for "waitIndex"`, |
| 401 | ) |
| 402 | } |
| 403 | |
| 404 | var rec, sort, wait, dir, quorum, stream bool |
| 405 | if rec, err = getBool(r.Form, "recursive"); err != nil { |
| 406 | return emptyReq, false, v2error.NewRequestError( |
| 407 | v2error.EcodeInvalidField, |
| 408 | `invalid value for "recursive"`, |
| 409 | ) |
| 410 | } |
| 411 | if sort, err = getBool(r.Form, "sorted"); err != nil { |
| 412 | return emptyReq, false, v2error.NewRequestError( |
| 413 | v2error.EcodeInvalidField, |
| 414 | `invalid value for "sorted"`, |
| 415 | ) |
| 416 | } |
| 417 | if wait, err = getBool(r.Form, "wait"); err != nil { |
| 418 | return emptyReq, false, v2error.NewRequestError( |
| 419 | v2error.EcodeInvalidField, |
| 420 | `invalid value for "wait"`, |
| 421 | ) |
| 422 | } |
| 423 | // TODO(jonboulle): define what parameters dir is/isn't compatible with? |
| 424 | if dir, err = getBool(r.Form, "dir"); err != nil { |
| 425 | return emptyReq, false, v2error.NewRequestError( |
| 426 | v2error.EcodeInvalidField, |
| 427 | `invalid value for "dir"`, |
| 428 | ) |
| 429 | } |
| 430 | if quorum, err = getBool(r.Form, "quorum"); err != nil { |
| 431 | return emptyReq, false, v2error.NewRequestError( |
| 432 | v2error.EcodeInvalidField, |
| 433 | `invalid value for "quorum"`, |
| 434 | ) |
| 435 | } |
| 436 | if stream, err = getBool(r.Form, "stream"); err != nil { |
| 437 | return emptyReq, false, v2error.NewRequestError( |
| 438 | v2error.EcodeInvalidField, |
| 439 | `invalid value for "stream"`, |
| 440 | ) |
| 441 | } |
| 442 | |
| 443 | if wait && r.Method != "GET" { |
| 444 | return emptyReq, false, v2error.NewRequestError( |
| 445 | v2error.EcodeInvalidField, |
| 446 | `"wait" can only be used with GET requests`, |
| 447 | ) |
| 448 | } |
| 449 | |
| 450 | pV := r.FormValue("prevValue") |
| 451 | if _, ok := r.Form["prevValue"]; ok && pV == "" { |
| 452 | return emptyReq, false, v2error.NewRequestError( |
| 453 | v2error.EcodePrevValueRequired, |
| 454 | `"prevValue" cannot be empty`, |
| 455 | ) |
| 456 | } |
| 457 | |
| 458 | if noValueOnSuccess, err = getBool(r.Form, "noValueOnSuccess"); err != nil { |
| 459 | return emptyReq, false, v2error.NewRequestError( |
| 460 | v2error.EcodeInvalidField, |
| 461 | `invalid value for "noValueOnSuccess"`, |
| 462 | ) |
| 463 | } |
| 464 | |
| 465 | // TTL is nullable, so leave it null if not specified |
| 466 | // or an empty string |
| 467 | var ttl *uint64 |
| 468 | if len(r.FormValue("ttl")) > 0 { |
| 469 | i, err := getUint64(r.Form, "ttl") |
| 470 | if err != nil { |
| 471 | return emptyReq, false, v2error.NewRequestError( |
| 472 | v2error.EcodeTTLNaN, |
| 473 | `invalid value for "ttl"`, |
| 474 | ) |
| 475 | } |
| 476 | ttl = &i |
| 477 | } |
| 478 | |
| 479 | // prevExist is nullable, so leave it null if not specified |
| 480 | var pe *bool |
| 481 | if _, ok := r.Form["prevExist"]; ok { |
| 482 | bv, err := getBool(r.Form, "prevExist") |
| 483 | if err != nil { |
| 484 | return emptyReq, false, v2error.NewRequestError( |
| 485 | v2error.EcodeInvalidField, |
| 486 | "invalid value for prevExist", |
| 487 | ) |
| 488 | } |
| 489 | pe = &bv |
| 490 | } |
| 491 | |
| 492 | // refresh is nullable, so leave it null if not specified |
| 493 | var refresh *bool |
| 494 | if _, ok := r.Form["refresh"]; ok { |
| 495 | bv, err := getBool(r.Form, "refresh") |
| 496 | if err != nil { |
| 497 | return emptyReq, false, v2error.NewRequestError( |
| 498 | v2error.EcodeInvalidField, |
| 499 | "invalid value for refresh", |
| 500 | ) |
| 501 | } |
| 502 | refresh = &bv |
| 503 | if refresh != nil && *refresh { |
| 504 | val := r.FormValue("value") |
| 505 | if _, ok := r.Form["value"]; ok && val != "" { |
| 506 | return emptyReq, false, v2error.NewRequestError( |
| 507 | v2error.EcodeRefreshValue, |
| 508 | `A value was provided on a refresh`, |
| 509 | ) |
| 510 | } |
| 511 | if ttl == nil { |
| 512 | return emptyReq, false, v2error.NewRequestError( |
| 513 | v2error.EcodeRefreshTTLRequired, |
| 514 | `No TTL value set`, |
| 515 | ) |
| 516 | } |
| 517 | } |
| 518 | } |
| 519 | |
| 520 | rr := etcdserverpb.Request{ |
| 521 | Method: r.Method, |
| 522 | Path: p, |
| 523 | Val: r.FormValue("value"), |
| 524 | Dir: dir, |
| 525 | PrevValue: pV, |
| 526 | PrevIndex: pIdx, |
| 527 | PrevExist: pe, |
| 528 | Wait: wait, |
| 529 | Since: wIdx, |
| 530 | Recursive: rec, |
| 531 | Sorted: sort, |
| 532 | Quorum: quorum, |
| 533 | Stream: stream, |
| 534 | } |
| 535 | |
| 536 | if pe != nil { |
| 537 | rr.PrevExist = pe |
| 538 | } |
| 539 | |
| 540 | if refresh != nil { |
| 541 | rr.Refresh = refresh |
| 542 | } |
| 543 | |
| 544 | // Null TTL is equivalent to unset Expiration |
| 545 | if ttl != nil { |
| 546 | expr := time.Duration(*ttl) * time.Second |
| 547 | rr.Expiration = clock.Now().Add(expr).UnixNano() |
| 548 | } |
| 549 | |
| 550 | return rr, noValueOnSuccess, nil |
| 551 | } |
| 552 | |
| 553 | // writeKeyEvent trims the prefix of key path in a single Event under |
| 554 | // StoreKeysPrefix, serializes it and writes the resulting JSON to the given |
| 555 | // ResponseWriter, along with the appropriate headers. |
| 556 | func writeKeyEvent(w http.ResponseWriter, resp etcdserver.Response, noValueOnSuccess bool) error { |
| 557 | ev := resp.Event |
| 558 | if ev == nil { |
| 559 | return errors.New("cannot write empty Event") |
| 560 | } |
| 561 | w.Header().Set("Content-Type", "application/json") |
| 562 | w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) |
| 563 | w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index)) |
| 564 | w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term)) |
| 565 | |
| 566 | if ev.IsCreated() { |
| 567 | w.WriteHeader(http.StatusCreated) |
| 568 | } |
| 569 | |
| 570 | ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) |
| 571 | if noValueOnSuccess && |
| 572 | (ev.Action == v2store.Set || ev.Action == v2store.CompareAndSwap || |
| 573 | ev.Action == v2store.Create || ev.Action == v2store.Update) { |
| 574 | ev.Node = nil |
| 575 | ev.PrevNode = nil |
| 576 | } |
| 577 | return json.NewEncoder(w).Encode(ev) |
| 578 | } |
| 579 | |
| 580 | func writeKeyNoAuth(w http.ResponseWriter) { |
| 581 | e := v2error.NewError(v2error.EcodeUnauthorized, "Insufficient credentials", 0) |
| 582 | e.WriteTo(w) |
| 583 | } |
| 584 | |
| 585 | // writeKeyError logs and writes the given Error to the ResponseWriter. |
| 586 | // If Error is not an etcdErr, the error will be converted to an etcd error. |
| 587 | func writeKeyError(lg *zap.Logger, w http.ResponseWriter, err error) { |
| 588 | if err == nil { |
| 589 | return |
| 590 | } |
| 591 | switch e := err.(type) { |
| 592 | case *v2error.Error: |
| 593 | e.WriteTo(w) |
| 594 | default: |
| 595 | switch err { |
| 596 | case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost: |
| 597 | if lg != nil { |
| 598 | lg.Warn( |
| 599 | "v2 response error", |
| 600 | zap.String("internal-server-error", err.Error()), |
| 601 | ) |
| 602 | } else { |
| 603 | mlog.MergeError(err) |
| 604 | } |
| 605 | default: |
| 606 | if lg != nil { |
| 607 | lg.Warn( |
| 608 | "unexpected v2 response error", |
| 609 | zap.String("internal-server-error", err.Error()), |
| 610 | ) |
| 611 | } else { |
| 612 | mlog.MergeErrorf("got unexpected response error (%v)", err) |
| 613 | } |
| 614 | } |
| 615 | ee := v2error.NewError(v2error.EcodeRaftInternal, err.Error(), 0) |
| 616 | ee.WriteTo(w) |
| 617 | } |
| 618 | } |
| 619 | |
| 620 | func handleKeyWatch(ctx context.Context, lg *zap.Logger, w http.ResponseWriter, resp etcdserver.Response, stream bool) { |
| 621 | wa := resp.Watcher |
| 622 | defer wa.Remove() |
| 623 | ech := wa.EventChan() |
| 624 | var nch <-chan bool |
| 625 | if x, ok := w.(http.CloseNotifier); ok { |
| 626 | nch = x.CloseNotify() |
| 627 | } |
| 628 | |
| 629 | w.Header().Set("Content-Type", "application/json") |
| 630 | w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex())) |
| 631 | w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index)) |
| 632 | w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term)) |
| 633 | w.WriteHeader(http.StatusOK) |
| 634 | |
| 635 | // Ensure headers are flushed early, in case of long polling |
| 636 | w.(http.Flusher).Flush() |
| 637 | |
| 638 | for { |
| 639 | select { |
| 640 | case <-nch: |
| 641 | // Client closed connection. Nothing to do. |
| 642 | return |
| 643 | case <-ctx.Done(): |
| 644 | // Timed out. net/http will close the connection for us, so nothing to do. |
| 645 | return |
| 646 | case ev, ok := <-ech: |
| 647 | if !ok { |
| 648 | // If the channel is closed this may be an indication of |
| 649 | // that notifications are much more than we are able to |
| 650 | // send to the client in time. Then we simply end streaming. |
| 651 | return |
| 652 | } |
| 653 | ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix) |
| 654 | if err := json.NewEncoder(w).Encode(ev); err != nil { |
| 655 | // Should never be reached |
| 656 | if lg != nil { |
| 657 | lg.Warn("failed to encode event", zap.Error(err)) |
| 658 | } else { |
| 659 | plog.Warningf("error writing event (%v)", err) |
| 660 | } |
| 661 | return |
| 662 | } |
| 663 | if !stream { |
| 664 | return |
| 665 | } |
| 666 | w.(http.Flusher).Flush() |
| 667 | } |
| 668 | } |
| 669 | } |
| 670 | |
| 671 | func trimEventPrefix(ev *v2store.Event, prefix string) *v2store.Event { |
| 672 | if ev == nil { |
| 673 | return nil |
| 674 | } |
| 675 | // Since the *Event may reference one in the store history |
| 676 | // history, we must copy it before modifying |
| 677 | e := ev.Clone() |
| 678 | trimNodeExternPrefix(e.Node, prefix) |
| 679 | trimNodeExternPrefix(e.PrevNode, prefix) |
| 680 | return e |
| 681 | } |
| 682 | |
| 683 | func trimNodeExternPrefix(n *v2store.NodeExtern, prefix string) { |
| 684 | if n == nil { |
| 685 | return |
| 686 | } |
| 687 | n.Key = strings.TrimPrefix(n.Key, prefix) |
| 688 | for _, nn := range n.Nodes { |
| 689 | trimNodeExternPrefix(nn, prefix) |
| 690 | } |
| 691 | } |
| 692 | |
| 693 | func trimErrorPrefix(err error, prefix string) error { |
| 694 | if e, ok := err.(*v2error.Error); ok { |
| 695 | e.Cause = strings.TrimPrefix(e.Cause, prefix) |
| 696 | } |
| 697 | return err |
| 698 | } |
| 699 | |
| 700 | func unmarshalRequest(lg *zap.Logger, r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool { |
| 701 | ctype := r.Header.Get("Content-Type") |
| 702 | semicolonPosition := strings.Index(ctype, ";") |
| 703 | if semicolonPosition != -1 { |
| 704 | ctype = strings.TrimSpace(strings.ToLower(ctype[0:semicolonPosition])) |
| 705 | } |
| 706 | if ctype != "application/json" { |
| 707 | writeError(lg, w, r, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype))) |
| 708 | return false |
| 709 | } |
| 710 | b, err := ioutil.ReadAll(r.Body) |
| 711 | if err != nil { |
| 712 | writeError(lg, w, r, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) |
| 713 | return false |
| 714 | } |
| 715 | if err := req.UnmarshalJSON(b); err != nil { |
| 716 | writeError(lg, w, r, httptypes.NewHTTPError(http.StatusBadRequest, err.Error())) |
| 717 | return false |
| 718 | } |
| 719 | return true |
| 720 | } |
| 721 | |
| 722 | func getID(lg *zap.Logger, p string, w http.ResponseWriter) (types.ID, bool) { |
| 723 | idStr := trimPrefix(p, membersPrefix) |
| 724 | if idStr == "" { |
| 725 | http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) |
| 726 | return 0, false |
| 727 | } |
| 728 | id, err := types.IDFromString(idStr) |
| 729 | if err != nil { |
| 730 | writeError(lg, w, nil, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr))) |
| 731 | return 0, false |
| 732 | } |
| 733 | return id, true |
| 734 | } |
| 735 | |
| 736 | // getUint64 extracts a uint64 by the given key from a Form. If the key does |
| 737 | // not exist in the form, 0 is returned. If the key exists but the value is |
| 738 | // badly formed, an error is returned. If multiple values are present only the |
| 739 | // first is considered. |
| 740 | func getUint64(form url.Values, key string) (i uint64, err error) { |
| 741 | if vals, ok := form[key]; ok { |
| 742 | i, err = strconv.ParseUint(vals[0], 10, 64) |
| 743 | } |
| 744 | return |
| 745 | } |
| 746 | |
| 747 | // getBool extracts a bool by the given key from a Form. If the key does not |
| 748 | // exist in the form, false is returned. If the key exists but the value is |
| 749 | // badly formed, an error is returned. If multiple values are present only the |
| 750 | // first is considered. |
| 751 | func getBool(form url.Values, key string) (b bool, err error) { |
| 752 | if vals, ok := form[key]; ok { |
| 753 | b, err = strconv.ParseBool(vals[0]) |
| 754 | } |
| 755 | return |
| 756 | } |
| 757 | |
| 758 | // trimPrefix removes a given prefix and any slash following the prefix |
| 759 | // e.g.: trimPrefix("foo", "foo") == trimPrefix("foo/", "foo") == "" |
| 760 | func trimPrefix(p, prefix string) (s string) { |
| 761 | s = strings.TrimPrefix(p, prefix) |
| 762 | s = strings.TrimPrefix(s, "/") |
| 763 | return |
| 764 | } |
| 765 | |
| 766 | func newMemberCollection(ms []*membership.Member) *httptypes.MemberCollection { |
| 767 | c := httptypes.MemberCollection(make([]httptypes.Member, len(ms))) |
| 768 | |
| 769 | for i, m := range ms { |
| 770 | c[i] = newMember(m) |
| 771 | } |
| 772 | |
| 773 | return &c |
| 774 | } |
| 775 | |
| 776 | func newMember(m *membership.Member) httptypes.Member { |
| 777 | tm := httptypes.Member{ |
| 778 | ID: m.ID.String(), |
| 779 | Name: m.Name, |
| 780 | PeerURLs: make([]string, len(m.PeerURLs)), |
| 781 | ClientURLs: make([]string, len(m.ClientURLs)), |
| 782 | } |
| 783 | |
| 784 | copy(tm.PeerURLs, m.PeerURLs) |
| 785 | copy(tm.ClientURLs, m.ClientURLs) |
| 786 | |
| 787 | return tm |
| 788 | } |