[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 9c3e5a0..dd01e4e 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -13,7 +13,7 @@
 	"sync/atomic"
 	"time"
 
-	metrics "github.com/rcrowley/go-metrics"
+	"github.com/rcrowley/go-metrics"
 )
 
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
@@ -40,6 +40,7 @@
 	outgoingByteRate       metrics.Meter
 	responseRate           metrics.Meter
 	responseSize           metrics.Histogram
+	requestsInFlight       metrics.Counter
 	brokerIncomingByteRate metrics.Meter
 	brokerRequestRate      metrics.Meter
 	brokerRequestSize      metrics.Histogram
@@ -47,6 +48,7 @@
 	brokerOutgoingByteRate metrics.Meter
 	brokerResponseRate     metrics.Meter
 	brokerResponseSize     metrics.Histogram
+	brokerRequestsInFlight metrics.Counter
 
 	kerberosAuthenticator GSSAPIKerberosAuth
 }
@@ -71,7 +73,7 @@
 	// server negotiate SASL by wrapping tokens with Kafka protocol headers.
 	SASLHandshakeV1 = int16(1)
 	// SASLExtKeyAuth is the reserved extension key name sent as part of the
-	// SASL/OAUTHBEARER intial client response
+	// SASL/OAUTHBEARER initial client response
 	SASLExtKeyAuth = "auth"
 )
 
@@ -117,6 +119,7 @@
 type responsePromise struct {
 	requestTime   time.Time
 	correlationID int32
+	headerVersion int16
 	packets       chan []byte
 	errors        chan error
 }
@@ -151,27 +154,19 @@
 	go withRecover(func() {
 		defer b.lock.Unlock()
 
-		dialer := net.Dialer{
-			Timeout:   conf.Net.DialTimeout,
-			KeepAlive: conf.Net.KeepAlive,
-			LocalAddr: conf.Net.LocalAddr,
-		}
-
-		if conf.Net.TLS.Enable {
-			b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
-		} else if conf.Net.Proxy.Enable {
-			b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
-		} else {
-			b.conn, b.connErr = dialer.Dial("tcp", b.addr)
-		}
+		dialer := conf.getDialer()
+		b.conn, b.connErr = dialer.Dial("tcp", b.addr)
 		if b.connErr != nil {
 			Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
 			b.conn = nil
 			atomic.StoreInt32(&b.opened, 0)
 			return
 		}
-		b.conn = newBufConn(b.conn)
+		if conf.Net.TLS.Enable {
+			b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
+		}
 
+		b.conn = newBufConn(b.conn)
 		b.conf = conf
 
 		// Create or reuse the global metrics shared between brokers
@@ -182,6 +177,7 @@
 		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
 		b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
 		b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+		b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
 		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
 		// the same id (-1) and are already exposed through the global metrics above
 		if b.id >= 0 {
@@ -189,7 +185,6 @@
 		}
 
 		if conf.Net.SASL.Enable {
-
 			b.connErr = b.authenticateViaSASL()
 
 			if b.connErr != nil {
@@ -228,7 +223,7 @@
 	return b.conn != nil, b.connErr
 }
 
-//Close closes the broker resources
+// Close closes the broker resources
 func (b *Broker) Close() error {
 	b.lock.Lock()
 	defer b.lock.Unlock()
@@ -281,12 +276,11 @@
 	return *b.rack
 }
 
-//GetMetadata send a metadata request and returns a metadata response or error
+// GetMetadata send a metadata request and returns a metadata response or error
 func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
 	response := new(MetadataResponse)
 
 	err := b.sendAndReceive(request, response)
-
 	if err != nil {
 		return nil, err
 	}
@@ -294,12 +288,11 @@
 	return response, nil
 }
 
-//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
+// GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
 func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
 	response := new(ConsumerMetadataResponse)
 
 	err := b.sendAndReceive(request, response)
-
 	if err != nil {
 		return nil, err
 	}
@@ -307,12 +300,11 @@
 	return response, nil
 }
 
-//FindCoordinator sends a find coordinate request and returns a response or error
+// FindCoordinator sends a find coordinate request and returns a response or error
 func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
 	response := new(FindCoordinatorResponse)
 
 	err := b.sendAndReceive(request, response)
-
 	if err != nil {
 		return nil, err
 	}
@@ -320,12 +312,11 @@
 	return response, nil
 }
 
-//GetAvailableOffsets return an offset response or error
+// GetAvailableOffsets return an offset response or error
 func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
 	response := new(OffsetResponse)
 
 	err := b.sendAndReceive(request, response)
-
 	if err != nil {
 		return nil, err
 	}
@@ -333,7 +324,7 @@
 	return response, nil
 }
 
-//Produce returns a produce response or error
+// Produce returns a produce response or error
 func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
 	var (
 		response *ProduceResponse
@@ -354,7 +345,7 @@
 	return response, nil
 }
 
-//Fetch returns a FetchResponse or error
+// Fetch returns a FetchResponse or error
 func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
 	response := new(FetchResponse)
 
@@ -366,7 +357,7 @@
 	return response, nil
 }
 
-//CommitOffset return an Offset commit reponse or error
+// CommitOffset return an Offset commit response or error
 func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
 	response := new(OffsetCommitResponse)
 
@@ -378,9 +369,10 @@
 	return response, nil
 }
 
-//FetchOffset returns an offset fetch response or error
+// FetchOffset returns an offset fetch response or error
 func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
 	response := new(OffsetFetchResponse)
+	response.Version = request.Version // needed to handle the two header versions
 
 	err := b.sendAndReceive(request, response)
 	if err != nil {
@@ -390,7 +382,7 @@
 	return response, nil
 }
 
-//JoinGroup returns a join group response or error
+// JoinGroup returns a join group response or error
 func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
 	response := new(JoinGroupResponse)
 
@@ -402,7 +394,7 @@
 	return response, nil
 }
 
-//SyncGroup returns a sync group response or error
+// SyncGroup returns a sync group response or error
 func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
 	response := new(SyncGroupResponse)
 
@@ -414,7 +406,7 @@
 	return response, nil
 }
 
-//LeaveGroup return a leave group response or error
+// LeaveGroup return a leave group response or error
 func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
 	response := new(LeaveGroupResponse)
 
@@ -426,7 +418,7 @@
 	return response, nil
 }
 
-//Heartbeat returns a heartbeat response or error
+// Heartbeat returns a heartbeat response or error
 func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
 	response := new(HeartbeatResponse)
 
@@ -438,7 +430,7 @@
 	return response, nil
 }
 
-//ListGroups return a list group response or error
+// ListGroups return a list group response or error
 func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
 	response := new(ListGroupsResponse)
 
@@ -450,7 +442,7 @@
 	return response, nil
 }
 
-//DescribeGroups return describe group response or error
+// DescribeGroups return describe group response or error
 func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
 	response := new(DescribeGroupsResponse)
 
@@ -462,7 +454,7 @@
 	return response, nil
 }
 
-//ApiVersions return api version response or error
+// ApiVersions return api version response or error
 func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
 	response := new(ApiVersionsResponse)
 
@@ -474,7 +466,7 @@
 	return response, nil
 }
 
-//CreateTopics send a create topic request and returns create topic response
+// CreateTopics send a create topic request and returns create topic response
 func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
 	response := new(CreateTopicsResponse)
 
@@ -486,7 +478,7 @@
 	return response, nil
 }
 
-//DeleteTopics sends a delete topic request and returns delete topic response
+// DeleteTopics sends a delete topic request and returns delete topic response
 func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
 	response := new(DeleteTopicsResponse)
 
@@ -498,8 +490,8 @@
 	return response, nil
 }
 
-//CreatePartitions sends a create partition request and returns create
-//partitions response or error
+// CreatePartitions sends a create partition request and returns create
+// partitions response or error
 func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
 	response := new(CreatePartitionsResponse)
 
@@ -511,8 +503,34 @@
 	return response, nil
 }
 
-//DeleteRecords send a request to delete records and return delete record
-//response or error
+// AlterPartitionReassignments sends a alter partition reassignments request and
+// returns alter partition reassignments response
+func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) {
+	response := new(AlterPartitionReassignmentsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+// ListPartitionReassignments sends a list partition reassignments request and
+// returns list partition reassignments response
+func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
+	response := new(ListPartitionReassignmentsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+// DeleteRecords send a request to delete records and return delete record
+// response or error
 func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
 	response := new(DeleteRecordsResponse)
 
@@ -524,7 +542,7 @@
 	return response, nil
 }
 
-//DescribeAcls sends a describe acl request and returns a response or error
+// DescribeAcls sends a describe acl request and returns a response or error
 func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
 	response := new(DescribeAclsResponse)
 
@@ -536,7 +554,7 @@
 	return response, nil
 }
 
-//CreateAcls sends a create acl request and returns a response or error
+// CreateAcls sends a create acl request and returns a response or error
 func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
 	response := new(CreateAclsResponse)
 
@@ -548,7 +566,7 @@
 	return response, nil
 }
 
-//DeleteAcls sends a delete acl request and returns a response or error
+// DeleteAcls sends a delete acl request and returns a response or error
 func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
 	response := new(DeleteAclsResponse)
 
@@ -560,7 +578,7 @@
 	return response, nil
 }
 
-//InitProducerID sends an init producer request and returns a response or error
+// InitProducerID sends an init producer request and returns a response or error
 func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
 	response := new(InitProducerIDResponse)
 
@@ -572,8 +590,8 @@
 	return response, nil
 }
 
-//AddPartitionsToTxn send a request to add partition to txn and returns
-//a response or error
+// AddPartitionsToTxn send a request to add partition to txn and returns
+// a response or error
 func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
 	response := new(AddPartitionsToTxnResponse)
 
@@ -585,8 +603,8 @@
 	return response, nil
 }
 
-//AddOffsetsToTxn sends a request to add offsets to txn and returns a response
-//or error
+// AddOffsetsToTxn sends a request to add offsets to txn and returns a response
+// or error
 func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
 	response := new(AddOffsetsToTxnResponse)
 
@@ -598,7 +616,7 @@
 	return response, nil
 }
 
-//EndTxn sends a request to end txn and returns a response or error
+// EndTxn sends a request to end txn and returns a response or error
 func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
 	response := new(EndTxnResponse)
 
@@ -610,8 +628,8 @@
 	return response, nil
 }
 
-//TxnOffsetCommit sends a request to commit transaction offsets and returns
-//a response or error
+// TxnOffsetCommit sends a request to commit transaction offsets and returns
+// a response or error
 func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
 	response := new(TxnOffsetCommitResponse)
 
@@ -623,8 +641,8 @@
 	return response, nil
 }
 
-//DescribeConfigs sends a request to describe config and returns a response or
-//error
+// DescribeConfigs sends a request to describe config and returns a response or
+// error
 func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
 	response := new(DescribeConfigsResponse)
 
@@ -636,7 +654,7 @@
 	return response, nil
 }
 
-//AlterConfigs sends a request to alter config and return a response or error
+// AlterConfigs sends a request to alter config and return a response or error
 func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
 	response := new(AlterConfigsResponse)
 
@@ -648,7 +666,19 @@
 	return response, nil
 }
 
-//DeleteGroups sends a request to delete groups and returns a response or error
+// IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
+func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
+	response := new(IncrementalAlterConfigsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+// DeleteGroups sends a request to delete groups and returns a response or error
 func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
 	response := new(DeleteGroupsResponse)
 
@@ -659,7 +689,62 @@
 	return response, nil
 }
 
-func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
+// DescribeLogDirs sends a request to get the broker's log dir paths and sizes
+func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
+	response := new(DescribeLogDirsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+// DescribeUserScramCredentials sends a request to get SCRAM users
+func (b *Broker) DescribeUserScramCredentials(req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
+	res := new(DescribeUserScramCredentialsResponse)
+
+	err := b.sendAndReceive(req, res)
+	if err != nil {
+		return nil, err
+	}
+
+	return res, err
+}
+
+func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
+	res := new(AlterUserScramCredentialsResponse)
+
+	err := b.sendAndReceive(req, res)
+	if err != nil {
+		return nil, err
+	}
+
+	return res, nil
+}
+
+// readFull ensures the conn ReadDeadline has been setup before making a
+// call to io.ReadFull
+func (b *Broker) readFull(buf []byte) (n int, err error) {
+	if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
+		return 0, err
+	}
+
+	return io.ReadFull(b.conn, buf)
+}
+
+// write  ensures the conn WriteDeadline has been setup before making a
+// call to conn.Write
+func (b *Broker) write(buf []byte) (n int, err error) {
+	if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
+		return 0, err
+	}
+
+	return b.conn.Write(buf)
+}
+
+func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()
 
@@ -680,33 +765,36 @@
 		return nil, err
 	}
 
-	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
-	if err != nil {
-		return nil, err
-	}
-
 	requestTime := time.Now()
-	bytes, err := b.conn.Write(buf)
-	b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
+	// Will be decremented in responseReceiver (except error or request with NoResponse)
+	b.addRequestInFlightMetrics(1)
+	bytes, err := b.write(buf)
+	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		return nil, err
 	}
 	b.correlationID++
 
 	if !promiseResponse {
 		// Record request latency without the response
-		b.updateRequestLatencyMetrics(time.Since(requestTime))
+		b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
 		return nil, nil
 	}
 
-	promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
+	promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
 	b.responses <- promise
 
 	return &promise, nil
 }
 
-func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
-	promise, err := b.send(req, res != nil)
+func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
+	responseHeaderVersion := int16(-1)
+	if res != nil {
+		responseHeaderVersion = res.headerVersion()
+	}
+
+	promise, err := b.send(req, res != nil, responseHeaderVersion)
 	if err != nil {
 		return err
 	}
@@ -760,7 +848,7 @@
 		return err
 	}
 
-	port, err := strconv.Atoi(portstr)
+	port, err := strconv.ParseInt(portstr, 10, 32)
 	if err != nil {
 		return err
 	}
@@ -786,22 +874,20 @@
 
 func (b *Broker) responseReceiver() {
 	var dead error
-	header := make([]byte, 8)
 
 	for response := range b.responses {
 		if dead != nil {
+			// This was previously incremented in send() and
+			// we are not calling updateIncomingCommunicationMetrics()
+			b.addRequestInFlightMetrics(-1)
 			response.errors <- dead
 			continue
 		}
 
-		err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
-		if err != nil {
-			dead = err
-			response.errors <- err
-			continue
-		}
+		headerLength := getHeaderLength(response.headerVersion)
+		header := make([]byte, headerLength)
 
-		bytesReadHeader, err := io.ReadFull(b.conn, header)
+		bytesReadHeader, err := b.readFull(header)
 		requestLatency := time.Since(response.requestTime)
 		if err != nil {
 			b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
@@ -811,7 +897,7 @@
 		}
 
 		decodedHeader := responseHeader{}
-		err = decode(header, &decodedHeader)
+		err = versionedDecode(header, &decodedHeader, response.headerVersion)
 		if err != nil {
 			b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
 			dead = err
@@ -827,8 +913,8 @@
 			continue
 		}
 
-		buf := make([]byte, decodedHeader.length-4)
-		bytesReadBody, err := io.ReadFull(b.conn, buf)
+		buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
+		bytesReadBody, err := b.readFull(buf)
 		b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
 		if err != nil {
 			dead = err
@@ -841,6 +927,15 @@
 	close(b.done)
 }
 
+func getHeaderLength(headerVersion int16) int8 {
+	if headerVersion < 1 {
+		return 8
+	} else {
+		// header contains additional tagged field length (0), we don't support actual tags yet.
+		return 9
+	}
+}
+
 func (b *Broker) authenticateViaSASL() error {
 	switch b.conf.Net.SASL.Mechanism {
 	case SASLTypeOAuth:
@@ -871,31 +966,31 @@
 		return err
 	}
 
-	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
-	if err != nil {
-		return err
-	}
-
 	requestTime := time.Now()
-	bytes, err := b.conn.Write(buf)
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
+	bytes, err := b.write(buf)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
 		return err
 	}
 	b.correlationID++
-	//wait for the response
+
 	header := make([]byte, 8) // response header
-	_, err = io.ReadFull(b.conn, header)
+	_, err = b.readFull(header)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
 		return err
 	}
 
 	length := binary.BigEndian.Uint32(header[:4])
 	payload := make([]byte, length-4)
-	n, err := io.ReadFull(b.conn, payload)
+	n, err := b.readFull(payload)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
 		return err
 	}
@@ -943,10 +1038,9 @@
 // When credentials are invalid, Kafka replies with a SaslAuthenticate response
 // containing an error code and message detailing the authentication failure.
 func (b *Broker) sendAndReceiveSASLPlainAuth() error {
-	// default to V0 to allow for backward compatability when SASL is enabled
+	// default to V0 to allow for backward compatibility when SASL is enabled
 	// but not the handshake
 	if b.conf.Net.SASL.Handshake {
-
 		handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
 		if handshakeErr != nil {
 			Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
@@ -962,28 +1056,24 @@
 
 // sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
 func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
-
-	length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
-	authBytes := make([]byte, length+4) //4 byte length header + auth data
+	length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
+	authBytes := make([]byte, length+4) // 4 byte length header + auth data
 	binary.BigEndian.PutUint32(authBytes, uint32(length))
-	copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
-
-	err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
-	if err != nil {
-		Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
-		return err
-	}
+	copy(authBytes[4:], b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)
 
 	requestTime := time.Now()
-	bytesWritten, err := b.conn.Write(authBytes)
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
+	bytesWritten, err := b.write(authBytes)
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		return err
 	}
 
 	header := make([]byte, 4)
-	n, err := io.ReadFull(b.conn, header)
+	n, err := b.readFull(header)
 	b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
 	// If the credentials are valid, we would get a 4 byte response filled with null characters.
 	// Otherwise, the broker closes the connection and we get an EOF
@@ -1002,11 +1092,13 @@
 
 	requestTime := time.Now()
 
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
-
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		return err
 	}
@@ -1059,16 +1151,18 @@
 // if the broker responds with a challenge, in which case the token is
 // rejected.
 func (b *Broker) sendClientMessage(message []byte) (bool, error) {
-
 	requestTime := time.Now()
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	correlationID := b.correlationID
 
 	bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
+	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		return false, err
 	}
 
-	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.correlationID++
 
 	res := &SaslAuthenticateResponse{}
@@ -1099,22 +1193,25 @@
 	msg, err := scramClient.Step("")
 	if err != nil {
 		return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
-
 	}
 
 	for !scramClient.Done() {
 		requestTime := time.Now()
+		// Will be decremented in updateIncomingCommunicationMetrics (except error)
+		b.addRequestInFlightMetrics(1)
 		correlationID := b.correlationID
 		bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
+		b.updateOutgoingCommunicationMetrics(bytesWritten)
 		if err != nil {
+			b.addRequestInFlightMetrics(-1)
 			Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 			return err
 		}
 
-		b.updateOutgoingCommunicationMetrics(bytesWritten)
 		b.correlationID++
 		challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
 		if err != nil {
+			b.addRequestInFlightMetrics(-1)
 			Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
 			return err
 		}
@@ -1139,22 +1236,18 @@
 		return 0, err
 	}
 
-	if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
-		return 0, err
-	}
-
-	return b.conn.Write(buf)
+	return b.write(buf)
 }
 
 func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
 	buf := make([]byte, responseLengthSize+correlationIDSize)
-	_, err := io.ReadFull(b.conn, buf)
+	_, err := b.readFull(buf)
 	if err != nil {
 		return nil, err
 	}
 
 	header := responseHeader{}
-	err = decode(buf, &header)
+	err = versionedDecode(buf, &header, 0)
 	if err != nil {
 		return nil, err
 	}
@@ -1164,7 +1257,7 @@
 	}
 
 	buf = make([]byte, header.length-correlationIDSize)
-	_, err = io.ReadFull(b.conn, buf)
+	_, err = b.readFull(buf)
 	if err != nil {
 		return nil, err
 	}
@@ -1211,7 +1304,7 @@
 }
 
 func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
-	authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
+	authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
 	rb := &SaslAuthenticateRequest{authBytes}
 	req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
 	buf, err := encode(req, b.conf.MetricRegistry)
@@ -1219,16 +1312,10 @@
 		return 0, err
 	}
 
-	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
-	if err != nil {
-		Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
-		return 0, err
-	}
-	return b.conn.Write(buf)
+	return b.write(buf)
 }
 
 func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
-
 	rb := &SaslAuthenticateRequest{initialResp}
 
 	req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
@@ -1238,25 +1325,18 @@
 		return 0, err
 	}
 
-	if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
-		return 0, err
-	}
-
-	return b.conn.Write(buf)
+	return b.write(buf)
 }
 
 func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
-
 	buf := make([]byte, responseLengthSize+correlationIDSize)
-
-	bytesRead, err := io.ReadFull(b.conn, buf)
+	bytesRead, err := b.readFull(buf)
 	if err != nil {
 		return bytesRead, err
 	}
 
 	header := responseHeader{}
-
-	err = decode(buf, &header)
+	err = versionedDecode(buf, &header, 0)
 	if err != nil {
 		return bytesRead, err
 	}
@@ -1266,8 +1346,7 @@
 	}
 
 	buf = make([]byte, header.length-correlationIDSize)
-
-	c, err := io.ReadFull(b.conn, buf)
+	c, err := b.readFull(buf)
 	bytesRead += c
 	if err != nil {
 		return bytesRead, err
@@ -1285,7 +1364,7 @@
 }
 
 func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
-	b.updateRequestLatencyMetrics(requestLatency)
+	b.updateRequestLatencyAndInFlightMetrics(requestLatency)
 	b.responseRate.Mark(1)
 
 	if b.brokerResponseRate != nil {
@@ -1304,7 +1383,7 @@
 	}
 }
 
-func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
+func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
 	requestLatencyInMs := int64(requestLatency / time.Millisecond)
 	b.requestLatency.Update(requestLatencyInMs)
 
@@ -1312,6 +1391,14 @@
 		b.brokerRequestLatency.Update(requestLatencyInMs)
 	}
 
+	b.addRequestInFlightMetrics(-1)
+}
+
+func (b *Broker) addRequestInFlightMetrics(i int64) {
+	b.requestsInFlight.Inc(i)
+	if b.brokerRequestsInFlight != nil {
+		b.brokerRequestsInFlight.Inc(i)
+	}
 }
 
 func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1330,7 +1417,6 @@
 	if b.brokerRequestSize != nil {
 		b.brokerRequestSize.Update(requestSize)
 	}
-
 }
 
 func (b *Broker) registerMetrics() {
@@ -1341,12 +1427,14 @@
 	b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
 	b.brokerResponseRate = b.registerMeter("response-rate")
 	b.brokerResponseSize = b.registerHistogram("response-size")
+	b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
 }
 
 func (b *Broker) unregisterMetrics() {
 	for _, name := range b.registeredMetrics {
 		b.conf.MetricRegistry.Unregister(name)
 	}
+	b.registeredMetrics = nil
 }
 
 func (b *Broker) registerMeter(name string) metrics.Meter {
@@ -1360,3 +1448,28 @@
 	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
 	return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
 }
+
+func (b *Broker) registerCounter(name string) metrics.Counter {
+	nameForBroker := getMetricNameForBroker(name, b)
+	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+	return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
+}
+
+func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config {
+	if cfg == nil {
+		cfg = &tls.Config{
+			MinVersion: tls.VersionTLS12,
+		}
+	}
+	if cfg.ServerName != "" {
+		return cfg
+	}
+
+	c := cfg.Clone()
+	sn, _, err := net.SplitHostPort(addr)
+	if err != nil {
+		Logger.Println(fmt.Errorf("failed to get ServerName from addr %w", err))
+	}
+	c.ServerName = sn
+	return c
+}