[VOL-2193] Create mocks for Kafka Client and Etcd
This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.
Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/base.go b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/base.go
new file mode 100644
index 0000000..c9df62e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/base.go
@@ -0,0 +1,203 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdhttp
+
+import (
+ "encoding/json"
+ "expvar"
+ "fmt"
+ "net/http"
+ "strings"
+
+ "go.etcd.io/etcd/etcdserver"
+ "go.etcd.io/etcd/etcdserver/api"
+ "go.etcd.io/etcd/etcdserver/api/v2error"
+ "go.etcd.io/etcd/etcdserver/api/v2http/httptypes"
+ "go.etcd.io/etcd/pkg/logutil"
+ "go.etcd.io/etcd/version"
+
+ "github.com/coreos/pkg/capnslog"
+ "go.uber.org/zap"
+)
+
+var (
+ plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "etcdserver/api/etcdhttp")
+ mlog = logutil.NewMergeLogger(plog)
+)
+
+const (
+ configPath = "/config"
+ varsPath = "/debug/vars"
+ versionPath = "/version"
+)
+
+// HandleBasic adds handlers to a mux for serving JSON etcd client requests
+// that do not access the v2 store.
+func HandleBasic(mux *http.ServeMux, server etcdserver.ServerPeer) {
+ mux.HandleFunc(varsPath, serveVars)
+
+ // TODO: deprecate '/config/local/log' in v3.5
+ mux.HandleFunc(configPath+"/local/log", logHandleFunc)
+
+ HandleMetricsHealth(mux, server)
+ mux.HandleFunc(versionPath, versionHandler(server.Cluster(), serveVersion))
+}
+
+func versionHandler(c api.Cluster, fn func(http.ResponseWriter, *http.Request, string)) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ v := c.Version()
+ if v != nil {
+ fn(w, r, v.String())
+ } else {
+ fn(w, r, "not_decided")
+ }
+ }
+}
+
+func serveVersion(w http.ResponseWriter, r *http.Request, clusterV string) {
+ if !allowMethod(w, r, "GET") {
+ return
+ }
+ vs := version.Versions{
+ Server: version.Version,
+ Cluster: clusterV,
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ b, err := json.Marshal(&vs)
+ if err != nil {
+ plog.Panicf("cannot marshal versions to json (%v)", err)
+ }
+ w.Write(b)
+}
+
+// TODO: deprecate '/config/local/log' in v3.5
+func logHandleFunc(w http.ResponseWriter, r *http.Request) {
+ if !allowMethod(w, r, "PUT") {
+ return
+ }
+
+ in := struct{ Level string }{}
+
+ d := json.NewDecoder(r.Body)
+ if err := d.Decode(&in); err != nil {
+ WriteError(nil, w, r, httptypes.NewHTTPError(http.StatusBadRequest, "Invalid json body"))
+ return
+ }
+
+ logl, err := capnslog.ParseLevel(strings.ToUpper(in.Level))
+ if err != nil {
+ WriteError(nil, w, r, httptypes.NewHTTPError(http.StatusBadRequest, "Invalid log level "+in.Level))
+ return
+ }
+
+ plog.Noticef("globalLogLevel set to %q", logl.String())
+ capnslog.SetGlobalLogLevel(logl)
+ w.WriteHeader(http.StatusNoContent)
+}
+
+func serveVars(w http.ResponseWriter, r *http.Request) {
+ if !allowMethod(w, r, "GET") {
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json; charset=utf-8")
+ fmt.Fprintf(w, "{\n")
+ first := true
+ expvar.Do(func(kv expvar.KeyValue) {
+ if !first {
+ fmt.Fprintf(w, ",\n")
+ }
+ first = false
+ fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value)
+ })
+ fmt.Fprintf(w, "\n}\n")
+}
+
+func allowMethod(w http.ResponseWriter, r *http.Request, m string) bool {
+ if m == r.Method {
+ return true
+ }
+ w.Header().Set("Allow", m)
+ http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+ return false
+}
+
+// WriteError logs and writes the given Error to the ResponseWriter
+// If Error is an etcdErr, it is rendered to the ResponseWriter
+// Otherwise, it is assumed to be a StatusInternalServerError
+func WriteError(lg *zap.Logger, w http.ResponseWriter, r *http.Request, err error) {
+ if err == nil {
+ return
+ }
+ switch e := err.(type) {
+ case *v2error.Error:
+ e.WriteTo(w)
+
+ case *httptypes.HTTPError:
+ if et := e.WriteTo(w); et != nil {
+ if lg != nil {
+ lg.Debug(
+ "failed to write v2 HTTP error",
+ zap.String("remote-addr", r.RemoteAddr),
+ zap.String("internal-server-error", e.Error()),
+ zap.Error(et),
+ )
+ } else {
+ plog.Debugf("error writing HTTPError (%v) to %s", et, r.RemoteAddr)
+ }
+ }
+
+ default:
+ switch err {
+ case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost, etcdserver.ErrNotEnoughStartedMembers,
+ etcdserver.ErrUnhealthy:
+ if lg != nil {
+ lg.Warn(
+ "v2 response error",
+ zap.String("remote-addr", r.RemoteAddr),
+ zap.String("internal-server-error", err.Error()),
+ )
+ } else {
+ mlog.MergeError(err)
+ }
+
+ default:
+ if lg != nil {
+ lg.Warn(
+ "unexpected v2 response error",
+ zap.String("remote-addr", r.RemoteAddr),
+ zap.String("internal-server-error", err.Error()),
+ )
+ } else {
+ mlog.MergeErrorf("got unexpected response error (%v)", err)
+ }
+ }
+
+ herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error")
+ if et := herr.WriteTo(w); et != nil {
+ if lg != nil {
+ lg.Debug(
+ "failed to write v2 HTTP error",
+ zap.String("remote-addr", r.RemoteAddr),
+ zap.String("internal-server-error", err.Error()),
+ zap.Error(et),
+ )
+ } else {
+ plog.Debugf("error writing HTTPError (%v) to %s", et, r.RemoteAddr)
+ }
+ }
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/doc.go b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/doc.go
new file mode 100644
index 0000000..a03b626
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package etcdhttp implements HTTP transportation layer for etcdserver.
+package etcdhttp
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/metrics.go b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/metrics.go
new file mode 100644
index 0000000..f455e40
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/metrics.go
@@ -0,0 +1,123 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdhttp
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "time"
+
+ "go.etcd.io/etcd/etcdserver"
+ "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/raft"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+const (
+ PathMetrics = "/metrics"
+ PathHealth = "/health"
+)
+
+// HandleMetricsHealth registers metrics and health handlers.
+func HandleMetricsHealth(mux *http.ServeMux, srv etcdserver.ServerV2) {
+ mux.Handle(PathMetrics, promhttp.Handler())
+ mux.Handle(PathHealth, NewHealthHandler(func() Health { return checkHealth(srv) }))
+}
+
+// HandlePrometheus registers prometheus handler on '/metrics'.
+func HandlePrometheus(mux *http.ServeMux) {
+ mux.Handle(PathMetrics, promhttp.Handler())
+}
+
+// NewHealthHandler handles '/health' requests.
+func NewHealthHandler(hfunc func() Health) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ w.Header().Set("Allow", http.MethodGet)
+ http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+ return
+ }
+ h := hfunc()
+ d, _ := json.Marshal(h)
+ if h.Health != "true" {
+ http.Error(w, string(d), http.StatusServiceUnavailable)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ w.Write(d)
+ }
+}
+
+var (
+ healthSuccess = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "server",
+ Name: "health_success",
+ Help: "The total number of successful health checks",
+ })
+ healthFailed = prometheus.NewCounter(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "server",
+ Name: "health_failures",
+ Help: "The total number of failed health checks",
+ })
+)
+
+func init() {
+ prometheus.MustRegister(healthSuccess)
+ prometheus.MustRegister(healthFailed)
+}
+
+// Health defines etcd server health status.
+// TODO: remove manual parsing in etcdctl cluster-health
+type Health struct {
+ Health string `json:"health"`
+}
+
+// TODO: server NOSPACE, etcdserver.ErrNoLeader in health API
+
+func checkHealth(srv etcdserver.ServerV2) Health {
+ h := Health{Health: "true"}
+
+ as := srv.Alarms()
+ if len(as) > 0 {
+ h.Health = "false"
+ }
+
+ if h.Health == "true" {
+ if uint64(srv.Leader()) == raft.None {
+ h.Health = "false"
+ }
+ }
+
+ if h.Health == "true" {
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ _, err := srv.Do(ctx, etcdserverpb.Request{Method: "QGET"})
+ cancel()
+ if err != nil {
+ h.Health = "false"
+ }
+ }
+
+ if h.Health == "true" {
+ healthSuccess.Inc()
+ } else {
+ healthFailed.Inc()
+ }
+ return h
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/peer.go b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/peer.go
new file mode 100644
index 0000000..6c61bf5
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/peer.go
@@ -0,0 +1,159 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdhttp
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "strconv"
+ "strings"
+
+ "go.etcd.io/etcd/etcdserver"
+ "go.etcd.io/etcd/etcdserver/api"
+ "go.etcd.io/etcd/etcdserver/api/membership"
+ "go.etcd.io/etcd/etcdserver/api/rafthttp"
+ "go.etcd.io/etcd/lease/leasehttp"
+ "go.etcd.io/etcd/pkg/types"
+
+ "go.uber.org/zap"
+)
+
+const (
+ peerMembersPath = "/members"
+ peerMemberPromotePrefix = "/members/promote/"
+)
+
+// NewPeerHandler generates an http.Handler to handle etcd peer requests.
+func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeer) http.Handler {
+ return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler())
+}
+
+func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
+ peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
+ peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)
+
+ mux := http.NewServeMux()
+ mux.HandleFunc("/", http.NotFound)
+ mux.Handle(rafthttp.RaftPrefix, raftHandler)
+ mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
+ mux.Handle(peerMembersPath, peerMembersHandler)
+ mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
+ if leaseHandler != nil {
+ mux.Handle(leasehttp.LeasePrefix, leaseHandler)
+ mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
+ }
+ mux.HandleFunc(versionPath, versionHandler(s.Cluster(), serveVersion))
+ return mux
+}
+
+func newPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) http.Handler {
+ return &peerMembersHandler{
+ lg: lg,
+ cluster: cluster,
+ }
+}
+
+type peerMembersHandler struct {
+ lg *zap.Logger
+ cluster api.Cluster
+}
+
+func newPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) http.Handler {
+ return &peerMemberPromoteHandler{
+ lg: lg,
+ cluster: s.Cluster(),
+ server: s,
+ }
+}
+
+type peerMemberPromoteHandler struct {
+ lg *zap.Logger
+ cluster api.Cluster
+ server etcdserver.Server
+}
+
+func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if !allowMethod(w, r, "GET") {
+ return
+ }
+ w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
+
+ if r.URL.Path != peerMembersPath {
+ http.Error(w, "bad path", http.StatusBadRequest)
+ return
+ }
+ ms := h.cluster.Members()
+ w.Header().Set("Content-Type", "application/json")
+ if err := json.NewEncoder(w).Encode(ms); err != nil {
+ if h.lg != nil {
+ h.lg.Warn("failed to encode membership members", zap.Error(err))
+ } else {
+ plog.Warningf("failed to encode members response (%v)", err)
+ }
+ }
+}
+
+func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if !allowMethod(w, r, "POST") {
+ return
+ }
+ w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
+
+ if !strings.HasPrefix(r.URL.Path, peerMemberPromotePrefix) {
+ http.Error(w, "bad path", http.StatusBadRequest)
+ return
+ }
+ idStr := strings.TrimPrefix(r.URL.Path, peerMemberPromotePrefix)
+ id, err := strconv.ParseUint(idStr, 10, 64)
+ if err != nil {
+ http.Error(w, fmt.Sprintf("member %s not found in cluster", idStr), http.StatusNotFound)
+ return
+ }
+
+ resp, err := h.server.PromoteMember(r.Context(), id)
+ if err != nil {
+ switch err {
+ case membership.ErrIDNotFound:
+ http.Error(w, err.Error(), http.StatusNotFound)
+ case membership.ErrMemberNotLearner:
+ http.Error(w, err.Error(), http.StatusPreconditionFailed)
+ case etcdserver.ErrLearnerNotReady:
+ http.Error(w, err.Error(), http.StatusPreconditionFailed)
+ default:
+ WriteError(h.lg, w, r, err)
+ }
+ if h.lg != nil {
+ h.lg.Warn(
+ "failed to promote a member",
+ zap.String("member-id", types.ID(id).String()),
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("error promoting member %s (%v)", types.ID(id).String(), err)
+ }
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ if err := json.NewEncoder(w).Encode(resp); err != nil {
+ if h.lg != nil {
+ h.lg.Warn("failed to encode members response", zap.Error(err))
+ } else {
+ plog.Warningf("failed to encode members response (%v)", err)
+ }
+ }
+}