[VOL-2193] Create mocks for Kafka Client and Etcd

This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.

Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/pkg/transport/listener_tls.go b/vendor/go.etcd.io/etcd/pkg/transport/listener_tls.go
new file mode 100644
index 0000000..6f16009
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/pkg/transport/listener_tls.go
@@ -0,0 +1,272 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package transport
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"fmt"
+	"io/ioutil"
+	"net"
+	"strings"
+	"sync"
+)
+
+// tlsListener overrides a TLS listener so it will reject client
+// certificates with insufficient SAN credentials or CRL revoked
+// certificates.
+type tlsListener struct {
+	net.Listener
+	connc            chan net.Conn
+	donec            chan struct{}
+	err              error
+	handshakeFailure func(*tls.Conn, error)
+	check            tlsCheckFunc
+}
+
+type tlsCheckFunc func(context.Context, *tls.Conn) error
+
+// NewTLSListener handshakes TLS connections and performs optional CRL checking.
+func NewTLSListener(l net.Listener, tlsinfo *TLSInfo) (net.Listener, error) {
+	check := func(context.Context, *tls.Conn) error { return nil }
+	return newTLSListener(l, tlsinfo, check)
+}
+
+func newTLSListener(l net.Listener, tlsinfo *TLSInfo, check tlsCheckFunc) (net.Listener, error) {
+	if tlsinfo == nil || tlsinfo.Empty() {
+		l.Close()
+		return nil, fmt.Errorf("cannot listen on TLS for %s: KeyFile and CertFile are not presented", l.Addr().String())
+	}
+	tlscfg, err := tlsinfo.ServerConfig()
+	if err != nil {
+		return nil, err
+	}
+
+	hf := tlsinfo.HandshakeFailure
+	if hf == nil {
+		hf = func(*tls.Conn, error) {}
+	}
+
+	if len(tlsinfo.CRLFile) > 0 {
+		prevCheck := check
+		check = func(ctx context.Context, tlsConn *tls.Conn) error {
+			if err := prevCheck(ctx, tlsConn); err != nil {
+				return err
+			}
+			st := tlsConn.ConnectionState()
+			if certs := st.PeerCertificates; len(certs) > 0 {
+				return checkCRL(tlsinfo.CRLFile, certs)
+			}
+			return nil
+		}
+	}
+
+	tlsl := &tlsListener{
+		Listener:         tls.NewListener(l, tlscfg),
+		connc:            make(chan net.Conn),
+		donec:            make(chan struct{}),
+		handshakeFailure: hf,
+		check:            check,
+	}
+	go tlsl.acceptLoop()
+	return tlsl, nil
+}
+
+func (l *tlsListener) Accept() (net.Conn, error) {
+	select {
+	case conn := <-l.connc:
+		return conn, nil
+	case <-l.donec:
+		return nil, l.err
+	}
+}
+
+func checkSAN(ctx context.Context, tlsConn *tls.Conn) error {
+	st := tlsConn.ConnectionState()
+	if certs := st.PeerCertificates; len(certs) > 0 {
+		addr := tlsConn.RemoteAddr().String()
+		return checkCertSAN(ctx, certs[0], addr)
+	}
+	return nil
+}
+
+// acceptLoop launches each TLS handshake in a separate goroutine
+// to prevent a hanging TLS connection from blocking other connections.
+func (l *tlsListener) acceptLoop() {
+	var wg sync.WaitGroup
+	var pendingMu sync.Mutex
+
+	pending := make(map[net.Conn]struct{})
+	ctx, cancel := context.WithCancel(context.Background())
+	defer func() {
+		cancel()
+		pendingMu.Lock()
+		for c := range pending {
+			c.Close()
+		}
+		pendingMu.Unlock()
+		wg.Wait()
+		close(l.donec)
+	}()
+
+	for {
+		conn, err := l.Listener.Accept()
+		if err != nil {
+			l.err = err
+			return
+		}
+
+		pendingMu.Lock()
+		pending[conn] = struct{}{}
+		pendingMu.Unlock()
+
+		wg.Add(1)
+		go func() {
+			defer func() {
+				if conn != nil {
+					conn.Close()
+				}
+				wg.Done()
+			}()
+
+			tlsConn := conn.(*tls.Conn)
+			herr := tlsConn.Handshake()
+			pendingMu.Lock()
+			delete(pending, conn)
+			pendingMu.Unlock()
+
+			if herr != nil {
+				l.handshakeFailure(tlsConn, herr)
+				return
+			}
+			if err := l.check(ctx, tlsConn); err != nil {
+				l.handshakeFailure(tlsConn, err)
+				return
+			}
+
+			select {
+			case l.connc <- tlsConn:
+				conn = nil
+			case <-ctx.Done():
+			}
+		}()
+	}
+}
+
+func checkCRL(crlPath string, cert []*x509.Certificate) error {
+	// TODO: cache
+	crlBytes, err := ioutil.ReadFile(crlPath)
+	if err != nil {
+		return err
+	}
+	certList, err := x509.ParseCRL(crlBytes)
+	if err != nil {
+		return err
+	}
+	revokedSerials := make(map[string]struct{})
+	for _, rc := range certList.TBSCertList.RevokedCertificates {
+		revokedSerials[string(rc.SerialNumber.Bytes())] = struct{}{}
+	}
+	for _, c := range cert {
+		serial := string(c.SerialNumber.Bytes())
+		if _, ok := revokedSerials[serial]; ok {
+			return fmt.Errorf("transport: certificate serial %x revoked", serial)
+		}
+	}
+	return nil
+}
+
+func checkCertSAN(ctx context.Context, cert *x509.Certificate, remoteAddr string) error {
+	if len(cert.IPAddresses) == 0 && len(cert.DNSNames) == 0 {
+		return nil
+	}
+	h, _, herr := net.SplitHostPort(remoteAddr)
+	if herr != nil {
+		return herr
+	}
+	if len(cert.IPAddresses) > 0 {
+		cerr := cert.VerifyHostname(h)
+		if cerr == nil {
+			return nil
+		}
+		if len(cert.DNSNames) == 0 {
+			return cerr
+		}
+	}
+	if len(cert.DNSNames) > 0 {
+		ok, err := isHostInDNS(ctx, h, cert.DNSNames)
+		if ok {
+			return nil
+		}
+		errStr := ""
+		if err != nil {
+			errStr = " (" + err.Error() + ")"
+		}
+		return fmt.Errorf("tls: %q does not match any of DNSNames %q"+errStr, h, cert.DNSNames)
+	}
+	return nil
+}
+
+func isHostInDNS(ctx context.Context, host string, dnsNames []string) (ok bool, err error) {
+	// reverse lookup
+	wildcards, names := []string{}, []string{}
+	for _, dns := range dnsNames {
+		if strings.HasPrefix(dns, "*.") {
+			wildcards = append(wildcards, dns[1:])
+		} else {
+			names = append(names, dns)
+		}
+	}
+	lnames, lerr := net.DefaultResolver.LookupAddr(ctx, host)
+	for _, name := range lnames {
+		// strip trailing '.' from PTR record
+		if name[len(name)-1] == '.' {
+			name = name[:len(name)-1]
+		}
+		for _, wc := range wildcards {
+			if strings.HasSuffix(name, wc) {
+				return true, nil
+			}
+		}
+		for _, n := range names {
+			if n == name {
+				return true, nil
+			}
+		}
+	}
+	err = lerr
+
+	// forward lookup
+	for _, dns := range names {
+		addrs, lerr := net.DefaultResolver.LookupHost(ctx, dns)
+		if lerr != nil {
+			err = lerr
+			continue
+		}
+		for _, addr := range addrs {
+			if addr == host {
+				return true, nil
+			}
+		}
+	}
+	return false, err
+}
+
+func (l *tlsListener) Close() error {
+	err := l.Listener.Close()
+	<-l.donec
+	return err
+}