VOL-1867 move simulated olt from voltha-go to voltha-simolt-adapter
Sourced from voltha-go commit 251a11c0ffe60512318a644cd6ce0dc4e12f4018
Change-Id: I8e7ee4da1fed739b3c461917301d2729a79307f5
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
new file mode 100644
index 0000000..9129089
--- /dev/null
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -0,0 +1,1102 @@
+package sarama
+
+import (
+ "crypto/tls"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "net"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/rcrowley/go-metrics"
+)
+
+// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
+type Broker struct {
+ id int32
+ addr string
+ rack *string
+
+ conf *Config
+ correlationID int32
+ conn net.Conn
+ connErr error
+ lock sync.Mutex
+ opened int32
+
+ responses chan responsePromise
+ done chan bool
+
+ incomingByteRate metrics.Meter
+ requestRate metrics.Meter
+ requestSize metrics.Histogram
+ requestLatency metrics.Histogram
+ outgoingByteRate metrics.Meter
+ responseRate metrics.Meter
+ responseSize metrics.Histogram
+ brokerIncomingByteRate metrics.Meter
+ brokerRequestRate metrics.Meter
+ brokerRequestSize metrics.Histogram
+ brokerRequestLatency metrics.Histogram
+ brokerOutgoingByteRate metrics.Meter
+ brokerResponseRate metrics.Meter
+ brokerResponseSize metrics.Histogram
+}
+
+// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
+type SASLMechanism string
+
+const (
+ // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
+ SASLTypeOAuth = "OAUTHBEARER"
+ // SASLTypePlaintext represents the SASL/PLAIN mechanism
+ SASLTypePlaintext = "PLAIN"
+ // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
+ // server negotiate SASL auth using opaque packets.
+ SASLHandshakeV0 = int16(0)
+ // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
+ // server negotiate SASL by wrapping tokens with Kafka protocol headers.
+ SASLHandshakeV1 = int16(1)
+ // SASLExtKeyAuth is the reserved extension key name sent as part of the
+ // SASL/OAUTHBEARER intial client response
+ SASLExtKeyAuth = "auth"
+)
+
+// AccessToken contains an access token used to authenticate a
+// SASL/OAUTHBEARER client along with associated metadata.
+type AccessToken struct {
+ // Token is the access token payload.
+ Token string
+ // Extensions is a optional map of arbitrary key-value pairs that can be
+ // sent with the SASL/OAUTHBEARER initial client response. These values are
+ // ignored by the SASL server if they are unexpected. This feature is only
+ // supported by Kafka >= 2.1.0.
+ Extensions map[string]string
+}
+
+// AccessTokenProvider is the interface that encapsulates how implementors
+// can generate access tokens for Kafka broker authentication.
+type AccessTokenProvider interface {
+ // Token returns an access token. The implementation should ensure token
+ // reuse so that multiple calls at connect time do not create multiple
+ // tokens. The implementation should also periodically refresh the token in
+ // order to guarantee that each call returns an unexpired token. This
+ // method should not block indefinitely--a timeout error should be returned
+ // after a short period of inactivity so that the broker connection logic
+ // can log debugging information and retry.
+ Token() (*AccessToken, error)
+}
+
+type responsePromise struct {
+ requestTime time.Time
+ correlationID int32
+ packets chan []byte
+ errors chan error
+}
+
+// NewBroker creates and returns a Broker targeting the given host:port address.
+// This does not attempt to actually connect, you have to call Open() for that.
+func NewBroker(addr string) *Broker {
+ return &Broker{id: -1, addr: addr}
+}
+
+// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
+// waiting for the connection to complete. This means that any subsequent operations on the broker will
+// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
+// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
+// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
+func (b *Broker) Open(conf *Config) error {
+ if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
+ return ErrAlreadyConnected
+ }
+
+ if conf == nil {
+ conf = NewConfig()
+ }
+
+ err := conf.Validate()
+ if err != nil {
+ return err
+ }
+
+ b.lock.Lock()
+
+ go withRecover(func() {
+ defer b.lock.Unlock()
+
+ dialer := net.Dialer{
+ Timeout: conf.Net.DialTimeout,
+ KeepAlive: conf.Net.KeepAlive,
+ LocalAddr: conf.Net.LocalAddr,
+ }
+
+ if conf.Net.TLS.Enable {
+ b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
+ } else {
+ b.conn, b.connErr = dialer.Dial("tcp", b.addr)
+ }
+ if b.connErr != nil {
+ Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
+ b.conn = nil
+ atomic.StoreInt32(&b.opened, 0)
+ return
+ }
+ b.conn = newBufConn(b.conn)
+
+ b.conf = conf
+
+ // Create or reuse the global metrics shared between brokers
+ b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
+ b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
+ b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
+ b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
+ b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
+ b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
+ b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+ // Do not gather metrics for seeded broker (only used during bootstrap) because they share
+ // the same id (-1) and are already exposed through the global metrics above
+ if b.id >= 0 {
+ b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
+ b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
+ b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
+ b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
+ b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
+ b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
+ b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
+ }
+
+ if conf.Net.SASL.Enable {
+
+ b.connErr = b.authenticateViaSASL()
+
+ if b.connErr != nil {
+ err = b.conn.Close()
+ if err == nil {
+ Logger.Printf("Closed connection to broker %s\n", b.addr)
+ } else {
+ Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
+ }
+ b.conn = nil
+ atomic.StoreInt32(&b.opened, 0)
+ return
+ }
+ }
+
+ b.done = make(chan bool)
+ b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
+
+ if b.id >= 0 {
+ Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
+ } else {
+ Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
+ }
+ go withRecover(b.responseReceiver)
+ })
+
+ return nil
+}
+
+// Connected returns true if the broker is connected and false otherwise. If the broker is not
+// connected but it had tried to connect, the error from that connection attempt is also returned.
+func (b *Broker) Connected() (bool, error) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ return b.conn != nil, b.connErr
+}
+
+func (b *Broker) Close() error {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ if b.conn == nil {
+ return ErrNotConnected
+ }
+
+ close(b.responses)
+ <-b.done
+
+ err := b.conn.Close()
+
+ b.conn = nil
+ b.connErr = nil
+ b.done = nil
+ b.responses = nil
+
+ if b.id >= 0 {
+ b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
+ b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
+ b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
+ b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
+ }
+
+ if err == nil {
+ Logger.Printf("Closed connection to broker %s\n", b.addr)
+ } else {
+ Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
+ }
+
+ atomic.StoreInt32(&b.opened, 0)
+
+ return err
+}
+
+// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
+func (b *Broker) ID() int32 {
+ return b.id
+}
+
+// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
+func (b *Broker) Addr() string {
+ return b.addr
+}
+
+// Rack returns the broker's rack as retrieved from Kafka's metadata or the
+// empty string if it is not known. The returned value corresponds to the
+// broker's broker.rack configuration setting. Requires protocol version to be
+// at least v0.10.0.0.
+func (b *Broker) Rack() string {
+ if b.rack == nil {
+ return ""
+ }
+ return *b.rack
+}
+
+func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
+ response := new(MetadataResponse)
+
+ err := b.sendAndReceive(request, response)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
+ response := new(ConsumerMetadataResponse)
+
+ err := b.sendAndReceive(request, response)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
+ response := new(FindCoordinatorResponse)
+
+ err := b.sendAndReceive(request, response)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
+ response := new(OffsetResponse)
+
+ err := b.sendAndReceive(request, response)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
+ var response *ProduceResponse
+ var err error
+
+ if request.RequiredAcks == NoResponse {
+ err = b.sendAndReceive(request, nil)
+ } else {
+ response = new(ProduceResponse)
+ err = b.sendAndReceive(request, response)
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
+ response := new(FetchResponse)
+
+ err := b.sendAndReceive(request, response)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
+ response := new(OffsetCommitResponse)
+
+ err := b.sendAndReceive(request, response)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
+ response := new(OffsetFetchResponse)
+
+ err := b.sendAndReceive(request, response)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
+ response := new(JoinGroupResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
+ response := new(SyncGroupResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
+ response := new(LeaveGroupResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
+ response := new(HeartbeatResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
+ response := new(ListGroupsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
+ response := new(DescribeGroupsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
+ response := new(ApiVersionsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
+ response := new(CreateTopicsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
+ response := new(DeleteTopicsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
+ response := new(CreatePartitionsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
+ response := new(DeleteRecordsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
+ response := new(DescribeAclsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
+ response := new(CreateAclsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
+ response := new(DeleteAclsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
+ response := new(InitProducerIDResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
+ response := new(AddPartitionsToTxnResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
+ response := new(AddOffsetsToTxnResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
+ response := new(EndTxnResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
+ response := new(TxnOffsetCommitResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
+ response := new(DescribeConfigsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
+ response := new(AlterConfigsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
+ response := new(DeleteGroupsResponse)
+
+ if err := b.sendAndReceive(request, response); err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ if b.conn == nil {
+ if b.connErr != nil {
+ return nil, b.connErr
+ }
+ return nil, ErrNotConnected
+ }
+
+ if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
+ return nil, ErrUnsupportedVersion
+ }
+
+ req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
+ buf, err := encode(req, b.conf.MetricRegistry)
+ if err != nil {
+ return nil, err
+ }
+
+ err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
+ if err != nil {
+ return nil, err
+ }
+
+ requestTime := time.Now()
+ bytes, err := b.conn.Write(buf)
+ b.updateOutgoingCommunicationMetrics(bytes)
+ if err != nil {
+ return nil, err
+ }
+ b.correlationID++
+
+ if !promiseResponse {
+ // Record request latency without the response
+ b.updateRequestLatencyMetrics(time.Since(requestTime))
+ return nil, nil
+ }
+
+ promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
+ b.responses <- promise
+
+ return &promise, nil
+}
+
+func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
+ promise, err := b.send(req, res != nil)
+
+ if err != nil {
+ return err
+ }
+
+ if promise == nil {
+ return nil
+ }
+
+ select {
+ case buf := <-promise.packets:
+ return versionedDecode(buf, res, req.version())
+ case err = <-promise.errors:
+ return err
+ }
+}
+
+func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
+ b.id, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+
+ host, err := pd.getString()
+ if err != nil {
+ return err
+ }
+
+ port, err := pd.getInt32()
+ if err != nil {
+ return err
+ }
+
+ if version >= 1 {
+ b.rack, err = pd.getNullableString()
+ if err != nil {
+ return err
+ }
+ }
+
+ b.addr = net.JoinHostPort(host, fmt.Sprint(port))
+ if _, _, err := net.SplitHostPort(b.addr); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
+
+ host, portstr, err := net.SplitHostPort(b.addr)
+ if err != nil {
+ return err
+ }
+ port, err := strconv.Atoi(portstr)
+ if err != nil {
+ return err
+ }
+
+ pe.putInt32(b.id)
+
+ err = pe.putString(host)
+ if err != nil {
+ return err
+ }
+
+ pe.putInt32(int32(port))
+
+ if version >= 1 {
+ err = pe.putNullableString(b.rack)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (b *Broker) responseReceiver() {
+ var dead error
+ header := make([]byte, 8)
+ for response := range b.responses {
+ if dead != nil {
+ response.errors <- dead
+ continue
+ }
+
+ err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
+ if err != nil {
+ dead = err
+ response.errors <- err
+ continue
+ }
+
+ bytesReadHeader, err := io.ReadFull(b.conn, header)
+ requestLatency := time.Since(response.requestTime)
+ if err != nil {
+ b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
+ dead = err
+ response.errors <- err
+ continue
+ }
+
+ decodedHeader := responseHeader{}
+ err = decode(header, &decodedHeader)
+ if err != nil {
+ b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
+ dead = err
+ response.errors <- err
+ continue
+ }
+ if decodedHeader.correlationID != response.correlationID {
+ b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
+ // TODO if decoded ID < cur ID, discard until we catch up
+ // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
+ dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
+ response.errors <- dead
+ continue
+ }
+
+ buf := make([]byte, decodedHeader.length-4)
+ bytesReadBody, err := io.ReadFull(b.conn, buf)
+ b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
+ if err != nil {
+ dead = err
+ response.errors <- err
+ continue
+ }
+
+ response.packets <- buf
+ }
+ close(b.done)
+}
+
+func (b *Broker) authenticateViaSASL() error {
+ if b.conf.Net.SASL.Mechanism == SASLTypeOAuth {
+ return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
+ }
+ return b.sendAndReceiveSASLPlainAuth()
+}
+
+func (b *Broker) sendAndReceiveSASLHandshake(saslType string, version int16) error {
+ rb := &SaslHandshakeRequest{Mechanism: saslType, Version: version}
+
+ req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
+ buf, err := encode(req, b.conf.MetricRegistry)
+ if err != nil {
+ return err
+ }
+
+ err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
+ if err != nil {
+ return err
+ }
+
+ requestTime := time.Now()
+ bytes, err := b.conn.Write(buf)
+ b.updateOutgoingCommunicationMetrics(bytes)
+ if err != nil {
+ Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
+ return err
+ }
+ b.correlationID++
+ //wait for the response
+ header := make([]byte, 8) // response header
+ _, err = io.ReadFull(b.conn, header)
+ if err != nil {
+ Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
+ return err
+ }
+ length := binary.BigEndian.Uint32(header[:4])
+ payload := make([]byte, length-4)
+ n, err := io.ReadFull(b.conn, payload)
+ if err != nil {
+ Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
+ return err
+ }
+ b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
+ res := &SaslHandshakeResponse{}
+ err = versionedDecode(payload, res, 0)
+ if err != nil {
+ Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
+ return err
+ }
+ if res.Err != ErrNoError {
+ Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
+ return res.Err
+ }
+ Logger.Print("Successful SASL handshake")
+ return nil
+}
+
+// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
+// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
+//
+// In SASL Plain, Kafka expects the auth header to be in the following format
+// Message format (from https://tools.ietf.org/html/rfc4616):
+//
+// message = [authzid] UTF8NUL authcid UTF8NUL passwd
+// authcid = 1*SAFE ; MUST accept up to 255 octets
+// authzid = 1*SAFE ; MUST accept up to 255 octets
+// passwd = 1*SAFE ; MUST accept up to 255 octets
+// UTF8NUL = %x00 ; UTF-8 encoded NUL character
+//
+// SAFE = UTF1 / UTF2 / UTF3 / UTF4
+// ;; any UTF-8 encoded Unicode character except NUL
+//
+// When credentials are valid, Kafka returns a 4 byte array of null characters.
+// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
+// of responding to bad credentials but thats how its being done today.
+func (b *Broker) sendAndReceiveSASLPlainAuth() error {
+ if b.conf.Net.SASL.Handshake {
+ handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, SASLHandshakeV0)
+ if handshakeErr != nil {
+ Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
+ return handshakeErr
+ }
+ }
+ length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
+ authBytes := make([]byte, length+4) //4 byte length header + auth data
+ binary.BigEndian.PutUint32(authBytes, uint32(length))
+ copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
+
+ err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
+ if err != nil {
+ Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
+ return err
+ }
+
+ requestTime := time.Now()
+ bytesWritten, err := b.conn.Write(authBytes)
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
+ if err != nil {
+ Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
+ return err
+ }
+
+ header := make([]byte, 4)
+ n, err := io.ReadFull(b.conn, header)
+ b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
+ // If the credentials are valid, we would get a 4 byte response filled with null characters.
+ // Otherwise, the broker closes the connection and we get an EOF
+ if err != nil {
+ Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
+ return err
+ }
+
+ Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
+ return nil
+}
+
+// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
+// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
+func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
+
+ if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
+ return err
+ }
+
+ token, err := provider.Token()
+
+ if err != nil {
+ return err
+ }
+
+ requestTime := time.Now()
+
+ correlationID := b.correlationID
+
+ bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
+
+ if err != nil {
+ return err
+ }
+
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
+
+ b.correlationID++
+
+ bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
+
+ if err != nil {
+ return err
+ }
+
+ requestLatency := time.Since(requestTime)
+ b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
+
+ return nil
+}
+
+// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
+// https://tools.ietf.org/html/rfc7628
+func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
+
+ var ext string
+
+ if token.Extensions != nil && len(token.Extensions) > 0 {
+ if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
+ return []byte{}, fmt.Errorf("The extension `%s` is invalid", SASLExtKeyAuth)
+ }
+ ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
+ }
+
+ resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
+
+ return resp, nil
+}
+
+// mapToString returns a list of key-value pairs ordered by key.
+// keyValSep separates the key from the value. elemSep separates each pair.
+func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
+
+ buf := make([]string, 0, len(extensions))
+
+ for k, v := range extensions {
+ buf = append(buf, k+keyValSep+v)
+ }
+
+ sort.Strings(buf)
+
+ return strings.Join(buf, elemSep)
+}
+
+func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
+
+ initialResp, err := buildClientInitialResponse(token)
+
+ if err != nil {
+ return 0, err
+ }
+
+ rb := &SaslAuthenticateRequest{initialResp}
+
+ req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
+
+ buf, err := encode(req, b.conf.MetricRegistry)
+
+ if err != nil {
+ return 0, err
+ }
+
+ if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
+ return 0, err
+ }
+
+ return b.conn.Write(buf)
+}
+
+func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
+
+ buf := make([]byte, 8)
+
+ bytesRead, err := io.ReadFull(b.conn, buf)
+
+ if err != nil {
+ return bytesRead, err
+ }
+
+ header := responseHeader{}
+
+ err = decode(buf, &header)
+
+ if err != nil {
+ return bytesRead, err
+ }
+
+ if header.correlationID != correlationID {
+ return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
+ }
+
+ buf = make([]byte, header.length-4)
+
+ c, err := io.ReadFull(b.conn, buf)
+
+ bytesRead += c
+
+ if err != nil {
+ return bytesRead, err
+ }
+
+ res := &SaslAuthenticateResponse{}
+
+ if err := versionedDecode(buf, res, 0); err != nil {
+ return bytesRead, err
+ }
+
+ if err != nil {
+ return bytesRead, err
+ }
+
+ if res.Err != ErrNoError {
+ return bytesRead, res.Err
+ }
+
+ if len(res.SaslAuthBytes) > 0 {
+ Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
+ }
+
+ return bytesRead, nil
+}
+
+func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
+ b.updateRequestLatencyMetrics(requestLatency)
+ b.responseRate.Mark(1)
+ if b.brokerResponseRate != nil {
+ b.brokerResponseRate.Mark(1)
+ }
+ responseSize := int64(bytes)
+ b.incomingByteRate.Mark(responseSize)
+ if b.brokerIncomingByteRate != nil {
+ b.brokerIncomingByteRate.Mark(responseSize)
+ }
+ b.responseSize.Update(responseSize)
+ if b.brokerResponseSize != nil {
+ b.brokerResponseSize.Update(responseSize)
+ }
+}
+
+func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
+ requestLatencyInMs := int64(requestLatency / time.Millisecond)
+ b.requestLatency.Update(requestLatencyInMs)
+ if b.brokerRequestLatency != nil {
+ b.brokerRequestLatency.Update(requestLatencyInMs)
+ }
+}
+
+func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
+ b.requestRate.Mark(1)
+ if b.brokerRequestRate != nil {
+ b.brokerRequestRate.Mark(1)
+ }
+ requestSize := int64(bytes)
+ b.outgoingByteRate.Mark(requestSize)
+ if b.brokerOutgoingByteRate != nil {
+ b.brokerOutgoingByteRate.Mark(requestSize)
+ }
+ b.requestSize.Update(requestSize)
+ if b.brokerRequestSize != nil {
+ b.brokerRequestSize.Update(requestSize)
+ }
+}