[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 9c3e5a0..dd01e4e 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -13,7 +13,7 @@
"sync/atomic"
"time"
- metrics "github.com/rcrowley/go-metrics"
+ "github.com/rcrowley/go-metrics"
)
// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
@@ -40,6 +40,7 @@
outgoingByteRate metrics.Meter
responseRate metrics.Meter
responseSize metrics.Histogram
+ requestsInFlight metrics.Counter
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerRequestSize metrics.Histogram
@@ -47,6 +48,7 @@
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
+ brokerRequestsInFlight metrics.Counter
kerberosAuthenticator GSSAPIKerberosAuth
}
@@ -71,7 +73,7 @@
// server negotiate SASL by wrapping tokens with Kafka protocol headers.
SASLHandshakeV1 = int16(1)
// SASLExtKeyAuth is the reserved extension key name sent as part of the
- // SASL/OAUTHBEARER intial client response
+ // SASL/OAUTHBEARER initial client response
SASLExtKeyAuth = "auth"
)
@@ -117,6 +119,7 @@
type responsePromise struct {
requestTime time.Time
correlationID int32
+ headerVersion int16
packets chan []byte
errors chan error
}
@@ -151,27 +154,19 @@
go withRecover(func() {
defer b.lock.Unlock()
- dialer := net.Dialer{
- Timeout: conf.Net.DialTimeout,
- KeepAlive: conf.Net.KeepAlive,
- LocalAddr: conf.Net.LocalAddr,
- }
-
- if conf.Net.TLS.Enable {
- b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
- } else if conf.Net.Proxy.Enable {
- b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
- } else {
- b.conn, b.connErr = dialer.Dial("tcp", b.addr)
- }
+ dialer := conf.getDialer()
+ b.conn, b.connErr = dialer.Dial("tcp", b.addr)
if b.connErr != nil {
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
return
}
- b.conn = newBufConn(b.conn)
+ if conf.Net.TLS.Enable {
+ b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
+ }
+ b.conn = newBufConn(b.conn)
b.conf = conf
// Create or reuse the global metrics shared between brokers
@@ -182,6 +177,7 @@
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+ b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 {
@@ -189,7 +185,6 @@
}
if conf.Net.SASL.Enable {
-
b.connErr = b.authenticateViaSASL()
if b.connErr != nil {
@@ -228,7 +223,7 @@
return b.conn != nil, b.connErr
}
-//Close closes the broker resources
+// Close closes the broker resources
func (b *Broker) Close() error {
b.lock.Lock()
defer b.lock.Unlock()
@@ -281,12 +276,11 @@
return *b.rack
}
-//GetMetadata send a metadata request and returns a metadata response or error
+// GetMetadata send a metadata request and returns a metadata response or error
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
response := new(MetadataResponse)
err := b.sendAndReceive(request, response)
-
if err != nil {
return nil, err
}
@@ -294,12 +288,11 @@
return response, nil
}
-//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
+// GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
response := new(ConsumerMetadataResponse)
err := b.sendAndReceive(request, response)
-
if err != nil {
return nil, err
}
@@ -307,12 +300,11 @@
return response, nil
}
-//FindCoordinator sends a find coordinate request and returns a response or error
+// FindCoordinator sends a find coordinate request and returns a response or error
func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
response := new(FindCoordinatorResponse)
err := b.sendAndReceive(request, response)
-
if err != nil {
return nil, err
}
@@ -320,12 +312,11 @@
return response, nil
}
-//GetAvailableOffsets return an offset response or error
+// GetAvailableOffsets return an offset response or error
func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
response := new(OffsetResponse)
err := b.sendAndReceive(request, response)
-
if err != nil {
return nil, err
}
@@ -333,7 +324,7 @@
return response, nil
}
-//Produce returns a produce response or error
+// Produce returns a produce response or error
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
var (
response *ProduceResponse
@@ -354,7 +345,7 @@
return response, nil
}
-//Fetch returns a FetchResponse or error
+// Fetch returns a FetchResponse or error
func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
response := new(FetchResponse)
@@ -366,7 +357,7 @@
return response, nil
}
-//CommitOffset return an Offset commit reponse or error
+// CommitOffset return an Offset commit response or error
func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
response := new(OffsetCommitResponse)
@@ -378,9 +369,10 @@
return response, nil
}
-//FetchOffset returns an offset fetch response or error
+// FetchOffset returns an offset fetch response or error
func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
response := new(OffsetFetchResponse)
+ response.Version = request.Version // needed to handle the two header versions
err := b.sendAndReceive(request, response)
if err != nil {
@@ -390,7 +382,7 @@
return response, nil
}
-//JoinGroup returns a join group response or error
+// JoinGroup returns a join group response or error
func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
response := new(JoinGroupResponse)
@@ -402,7 +394,7 @@
return response, nil
}
-//SyncGroup returns a sync group response or error
+// SyncGroup returns a sync group response or error
func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
response := new(SyncGroupResponse)
@@ -414,7 +406,7 @@
return response, nil
}
-//LeaveGroup return a leave group response or error
+// LeaveGroup return a leave group response or error
func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
response := new(LeaveGroupResponse)
@@ -426,7 +418,7 @@
return response, nil
}
-//Heartbeat returns a heartbeat response or error
+// Heartbeat returns a heartbeat response or error
func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
response := new(HeartbeatResponse)
@@ -438,7 +430,7 @@
return response, nil
}
-//ListGroups return a list group response or error
+// ListGroups return a list group response or error
func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
response := new(ListGroupsResponse)
@@ -450,7 +442,7 @@
return response, nil
}
-//DescribeGroups return describe group response or error
+// DescribeGroups return describe group response or error
func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
response := new(DescribeGroupsResponse)
@@ -462,7 +454,7 @@
return response, nil
}
-//ApiVersions return api version response or error
+// ApiVersions return api version response or error
func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
response := new(ApiVersionsResponse)
@@ -474,7 +466,7 @@
return response, nil
}
-//CreateTopics send a create topic request and returns create topic response
+// CreateTopics send a create topic request and returns create topic response
func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
response := new(CreateTopicsResponse)
@@ -486,7 +478,7 @@
return response, nil
}
-//DeleteTopics sends a delete topic request and returns delete topic response
+// DeleteTopics sends a delete topic request and returns delete topic response
func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
response := new(DeleteTopicsResponse)
@@ -498,8 +490,8 @@
return response, nil
}
-//CreatePartitions sends a create partition request and returns create
-//partitions response or error
+// CreatePartitions sends a create partition request and returns create
+// partitions response or error
func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
response := new(CreatePartitionsResponse)
@@ -511,8 +503,34 @@
return response, nil
}
-//DeleteRecords send a request to delete records and return delete record
-//response or error
+// AlterPartitionReassignments sends a alter partition reassignments request and
+// returns alter partition reassignments response
+func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) {
+ response := new(AlterPartitionReassignmentsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+// ListPartitionReassignments sends a list partition reassignments request and
+// returns list partition reassignments response
+func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
+ response := new(ListPartitionReassignmentsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+// DeleteRecords send a request to delete records and return delete record
+// response or error
func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
response := new(DeleteRecordsResponse)
@@ -524,7 +542,7 @@
return response, nil
}
-//DescribeAcls sends a describe acl request and returns a response or error
+// DescribeAcls sends a describe acl request and returns a response or error
func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
response := new(DescribeAclsResponse)
@@ -536,7 +554,7 @@
return response, nil
}
-//CreateAcls sends a create acl request and returns a response or error
+// CreateAcls sends a create acl request and returns a response or error
func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
response := new(CreateAclsResponse)
@@ -548,7 +566,7 @@
return response, nil
}
-//DeleteAcls sends a delete acl request and returns a response or error
+// DeleteAcls sends a delete acl request and returns a response or error
func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
response := new(DeleteAclsResponse)
@@ -560,7 +578,7 @@
return response, nil
}
-//InitProducerID sends an init producer request and returns a response or error
+// InitProducerID sends an init producer request and returns a response or error
func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
response := new(InitProducerIDResponse)
@@ -572,8 +590,8 @@
return response, nil
}
-//AddPartitionsToTxn send a request to add partition to txn and returns
-//a response or error
+// AddPartitionsToTxn send a request to add partition to txn and returns
+// a response or error
func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
response := new(AddPartitionsToTxnResponse)
@@ -585,8 +603,8 @@
return response, nil
}
-//AddOffsetsToTxn sends a request to add offsets to txn and returns a response
-//or error
+// AddOffsetsToTxn sends a request to add offsets to txn and returns a response
+// or error
func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
response := new(AddOffsetsToTxnResponse)
@@ -598,7 +616,7 @@
return response, nil
}
-//EndTxn sends a request to end txn and returns a response or error
+// EndTxn sends a request to end txn and returns a response or error
func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
response := new(EndTxnResponse)
@@ -610,8 +628,8 @@
return response, nil
}
-//TxnOffsetCommit sends a request to commit transaction offsets and returns
-//a response or error
+// TxnOffsetCommit sends a request to commit transaction offsets and returns
+// a response or error
func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
response := new(TxnOffsetCommitResponse)
@@ -623,8 +641,8 @@
return response, nil
}
-//DescribeConfigs sends a request to describe config and returns a response or
-//error
+// DescribeConfigs sends a request to describe config and returns a response or
+// error
func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
response := new(DescribeConfigsResponse)
@@ -636,7 +654,7 @@
return response, nil
}
-//AlterConfigs sends a request to alter config and return a response or error
+// AlterConfigs sends a request to alter config and return a response or error
func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
response := new(AlterConfigsResponse)
@@ -648,7 +666,19 @@
return response, nil
}
-//DeleteGroups sends a request to delete groups and returns a response or error
+// IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
+func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
+ response := new(IncrementalAlterConfigsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+// DeleteGroups sends a request to delete groups and returns a response or error
func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
response := new(DeleteGroupsResponse)
@@ -659,7 +689,62 @@
return response, nil
}
-func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
+// DescribeLogDirs sends a request to get the broker's log dir paths and sizes
+func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
+ response := new(DescribeLogDirsResponse)
+
+ err := b.sendAndReceive(request, response)
+ if err != nil {
+ return nil, err
+ }
+
+ return response, nil
+}
+
+// DescribeUserScramCredentials sends a request to get SCRAM users
+func (b *Broker) DescribeUserScramCredentials(req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
+ res := new(DescribeUserScramCredentialsResponse)
+
+ err := b.sendAndReceive(req, res)
+ if err != nil {
+ return nil, err
+ }
+
+ return res, err
+}
+
+func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
+ res := new(AlterUserScramCredentialsResponse)
+
+ err := b.sendAndReceive(req, res)
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
+}
+
+// readFull ensures the conn ReadDeadline has been setup before making a
+// call to io.ReadFull
+func (b *Broker) readFull(buf []byte) (n int, err error) {
+ if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
+ return 0, err
+ }
+
+ return io.ReadFull(b.conn, buf)
+}
+
+// write ensures the conn WriteDeadline has been setup before making a
+// call to conn.Write
+func (b *Broker) write(buf []byte) (n int, err error) {
+ if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
+ return 0, err
+ }
+
+ return b.conn.Write(buf)
+}
+
+func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
@@ -680,33 +765,36 @@
return nil, err
}
- err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
- if err != nil {
- return nil, err
- }
-
requestTime := time.Now()
- bytes, err := b.conn.Write(buf)
- b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
+ // Will be decremented in responseReceiver (except error or request with NoResponse)
+ b.addRequestInFlightMetrics(1)
+ bytes, err := b.write(buf)
+ b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
return nil, err
}
b.correlationID++
if !promiseResponse {
// Record request latency without the response
- b.updateRequestLatencyMetrics(time.Since(requestTime))
+ b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
return nil, nil
}
- promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
+ promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
b.responses <- promise
return &promise, nil
}
-func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
- promise, err := b.send(req, res != nil)
+func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
+ responseHeaderVersion := int16(-1)
+ if res != nil {
+ responseHeaderVersion = res.headerVersion()
+ }
+
+ promise, err := b.send(req, res != nil, responseHeaderVersion)
if err != nil {
return err
}
@@ -760,7 +848,7 @@
return err
}
- port, err := strconv.Atoi(portstr)
+ port, err := strconv.ParseInt(portstr, 10, 32)
if err != nil {
return err
}
@@ -786,22 +874,20 @@
func (b *Broker) responseReceiver() {
var dead error
- header := make([]byte, 8)
for response := range b.responses {
if dead != nil {
+ // This was previously incremented in send() and
+ // we are not calling updateIncomingCommunicationMetrics()
+ b.addRequestInFlightMetrics(-1)
response.errors <- dead
continue
}
- err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
- if err != nil {
- dead = err
- response.errors <- err
- continue
- }
+ headerLength := getHeaderLength(response.headerVersion)
+ header := make([]byte, headerLength)
- bytesReadHeader, err := io.ReadFull(b.conn, header)
+ bytesReadHeader, err := b.readFull(header)
requestLatency := time.Since(response.requestTime)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
@@ -811,7 +897,7 @@
}
decodedHeader := responseHeader{}
- err = decode(header, &decodedHeader)
+ err = versionedDecode(header, &decodedHeader, response.headerVersion)
if err != nil {
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
dead = err
@@ -827,8 +913,8 @@
continue
}
- buf := make([]byte, decodedHeader.length-4)
- bytesReadBody, err := io.ReadFull(b.conn, buf)
+ buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
+ bytesReadBody, err := b.readFull(buf)
b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
if err != nil {
dead = err
@@ -841,6 +927,15 @@
close(b.done)
}
+func getHeaderLength(headerVersion int16) int8 {
+ if headerVersion < 1 {
+ return 8
+ } else {
+ // header contains additional tagged field length (0), we don't support actual tags yet.
+ return 9
+ }
+}
+
func (b *Broker) authenticateViaSASL() error {
switch b.conf.Net.SASL.Mechanism {
case SASLTypeOAuth:
@@ -871,31 +966,31 @@
return err
}
- err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
- if err != nil {
- return err
- }
-
requestTime := time.Now()
- bytes, err := b.conn.Write(buf)
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
+ bytes, err := b.write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
return err
}
b.correlationID++
- //wait for the response
+
header := make([]byte, 8) // response header
- _, err = io.ReadFull(b.conn, header)
+ _, err = b.readFull(header)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
return err
}
length := binary.BigEndian.Uint32(header[:4])
payload := make([]byte, length-4)
- n, err := io.ReadFull(b.conn, payload)
+ n, err := b.readFull(payload)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
return err
}
@@ -943,10 +1038,9 @@
// When credentials are invalid, Kafka replies with a SaslAuthenticate response
// containing an error code and message detailing the authentication failure.
func (b *Broker) sendAndReceiveSASLPlainAuth() error {
- // default to V0 to allow for backward compatability when SASL is enabled
+ // default to V0 to allow for backward compatibility when SASL is enabled
// but not the handshake
if b.conf.Net.SASL.Handshake {
-
handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
@@ -962,28 +1056,24 @@
// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
-
- length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
- authBytes := make([]byte, length+4) //4 byte length header + auth data
+ length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
+ authBytes := make([]byte, length+4) // 4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
- copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
-
- err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
- if err != nil {
- Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
- return err
- }
+ copy(authBytes[4:], b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)
requestTime := time.Now()
- bytesWritten, err := b.conn.Write(authBytes)
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
+ bytesWritten, err := b.write(authBytes)
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
header := make([]byte, 4)
- n, err := io.ReadFull(b.conn, header)
+ n, err := b.readFull(header)
b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
// If the credentials are valid, we would get a 4 byte response filled with null characters.
// Otherwise, the broker closes the connection and we get an EOF
@@ -1002,11 +1092,13 @@
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
-
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1059,16 +1151,18 @@
// if the broker responds with a challenge, in which case the token is
// rejected.
func (b *Broker) sendClientMessage(message []byte) (bool, error) {
-
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
correlationID := b.correlationID
bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
return false, err
}
- b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++
res := &SaslAuthenticateResponse{}
@@ -1099,22 +1193,25 @@
msg, err := scramClient.Step("")
if err != nil {
return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
-
}
for !scramClient.Done() {
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
correlationID := b.correlationID
bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
- b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++
challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1139,22 +1236,18 @@
return 0, err
}
- if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
- return 0, err
- }
-
- return b.conn.Write(buf)
+ return b.write(buf)
}
func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
buf := make([]byte, responseLengthSize+correlationIDSize)
- _, err := io.ReadFull(b.conn, buf)
+ _, err := b.readFull(buf)
if err != nil {
return nil, err
}
header := responseHeader{}
- err = decode(buf, &header)
+ err = versionedDecode(buf, &header, 0)
if err != nil {
return nil, err
}
@@ -1164,7 +1257,7 @@
}
buf = make([]byte, header.length-correlationIDSize)
- _, err = io.ReadFull(b.conn, buf)
+ _, err = b.readFull(buf)
if err != nil {
return nil, err
}
@@ -1211,7 +1304,7 @@
}
func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
- authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
+ authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
rb := &SaslAuthenticateRequest{authBytes}
req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
@@ -1219,16 +1312,10 @@
return 0, err
}
- err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
- if err != nil {
- Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
- return 0, err
- }
- return b.conn.Write(buf)
+ return b.write(buf)
}
func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
-
rb := &SaslAuthenticateRequest{initialResp}
req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
@@ -1238,25 +1325,18 @@
return 0, err
}
- if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
- return 0, err
- }
-
- return b.conn.Write(buf)
+ return b.write(buf)
}
func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
-
buf := make([]byte, responseLengthSize+correlationIDSize)
-
- bytesRead, err := io.ReadFull(b.conn, buf)
+ bytesRead, err := b.readFull(buf)
if err != nil {
return bytesRead, err
}
header := responseHeader{}
-
- err = decode(buf, &header)
+ err = versionedDecode(buf, &header, 0)
if err != nil {
return bytesRead, err
}
@@ -1266,8 +1346,7 @@
}
buf = make([]byte, header.length-correlationIDSize)
-
- c, err := io.ReadFull(b.conn, buf)
+ c, err := b.readFull(buf)
bytesRead += c
if err != nil {
return bytesRead, err
@@ -1285,7 +1364,7 @@
}
func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
- b.updateRequestLatencyMetrics(requestLatency)
+ b.updateRequestLatencyAndInFlightMetrics(requestLatency)
b.responseRate.Mark(1)
if b.brokerResponseRate != nil {
@@ -1304,7 +1383,7 @@
}
}
-func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
+func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
requestLatencyInMs := int64(requestLatency / time.Millisecond)
b.requestLatency.Update(requestLatencyInMs)
@@ -1312,6 +1391,14 @@
b.brokerRequestLatency.Update(requestLatencyInMs)
}
+ b.addRequestInFlightMetrics(-1)
+}
+
+func (b *Broker) addRequestInFlightMetrics(i int64) {
+ b.requestsInFlight.Inc(i)
+ if b.brokerRequestsInFlight != nil {
+ b.brokerRequestsInFlight.Inc(i)
+ }
}
func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1330,7 +1417,6 @@
if b.brokerRequestSize != nil {
b.brokerRequestSize.Update(requestSize)
}
-
}
func (b *Broker) registerMetrics() {
@@ -1341,12 +1427,14 @@
b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
b.brokerResponseRate = b.registerMeter("response-rate")
b.brokerResponseSize = b.registerHistogram("response-size")
+ b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
}
func (b *Broker) unregisterMetrics() {
for _, name := range b.registeredMetrics {
b.conf.MetricRegistry.Unregister(name)
}
+ b.registeredMetrics = nil
}
func (b *Broker) registerMeter(name string) metrics.Meter {
@@ -1360,3 +1448,28 @@
b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
}
+
+func (b *Broker) registerCounter(name string) metrics.Counter {
+ nameForBroker := getMetricNameForBroker(name, b)
+ b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+ return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
+}
+
+func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config {
+ if cfg == nil {
+ cfg = &tls.Config{
+ MinVersion: tls.VersionTLS12,
+ }
+ }
+ if cfg.ServerName != "" {
+ return cfg
+ }
+
+ c := cfg.Clone()
+ sn, _, err := net.SplitHostPort(addr)
+ if err != nil {
+ Logger.Println(fmt.Errorf("failed to get ServerName from addr %w", err))
+ }
+ c.ServerName = sn
+ return c
+}