SEBA-902 single-olt tests;
Pin protoc-gen-go to 1.3.2 to resolve compatibility issue;
Run go mod tidy / go mod vendor on importer;
Add Go Module support to demotest
Change-Id: Ifde824fc9a6317b0adc1e12bea54ee1f9b788906
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 8146749..d27ebd2 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -40,6 +40,7 @@
outgoingByteRate metrics.Meter
responseRate metrics.Meter
responseSize metrics.Histogram
+ requestsInFlight metrics.Counter
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerRequestSize metrics.Histogram
@@ -47,6 +48,7 @@
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
+ brokerRequestsInFlight metrics.Counter
kerberosAuthenticator GSSAPIKerberosAuth
}
@@ -182,6 +184,7 @@
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+ b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 {
@@ -189,7 +192,6 @@
}
if conf.Net.SASL.Enable {
-
b.connErr = b.authenticateViaSASL()
if b.connErr != nil {
@@ -713,16 +715,19 @@
}
requestTime := time.Now()
+ // Will be decremented in responseReceiver (except error or request with NoResponse)
+ b.addRequestInFlightMetrics(1)
bytes, err := b.write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
return nil, err
}
b.correlationID++
if !promiseResponse {
// Record request latency without the response
- b.updateRequestLatencyMetrics(time.Since(requestTime))
+ b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
return nil, nil
}
@@ -817,6 +822,9 @@
for response := range b.responses {
if dead != nil {
+ // This was previously incremented in send() and
+ // we are not calling updateIncomingCommunicationMetrics()
+ b.addRequestInFlightMetrics(-1)
response.errors <- dead
continue
}
@@ -892,9 +900,12 @@
}
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
bytes, err := b.write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
return err
}
@@ -903,6 +914,7 @@
header := make([]byte, 8) // response header
_, err = b.readFull(header)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
return err
}
@@ -911,6 +923,7 @@
payload := make([]byte, length-4)
n, err := b.readFull(payload)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
return err
}
@@ -961,7 +974,6 @@
// default to V0 to allow for backward compatability when SASL is enabled
// but not the handshake
if b.conf.Net.SASL.Handshake {
-
handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
@@ -977,16 +989,18 @@
// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
-
- length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
+ length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
authBytes := make([]byte, length+4) //4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
- copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
+ copy(authBytes[4:], []byte(b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
bytesWritten, err := b.write(authBytes)
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1011,11 +1025,13 @@
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
-
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1068,16 +1084,18 @@
// if the broker responds with a challenge, in which case the token is
// rejected.
func (b *Broker) sendClientMessage(message []byte) (bool, error) {
-
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
correlationID := b.correlationID
bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
return false, err
}
- b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++
res := &SaslAuthenticateResponse{}
@@ -1108,22 +1126,25 @@
msg, err := scramClient.Step("")
if err != nil {
return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
-
}
for !scramClient.Done() {
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
correlationID := b.correlationID
bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
- b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++
challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1216,7 +1237,7 @@
}
func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
- authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
+ authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
rb := &SaslAuthenticateRequest{authBytes}
req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
@@ -1228,7 +1249,6 @@
}
func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
-
rb := &SaslAuthenticateRequest{initialResp}
req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
@@ -1277,7 +1297,7 @@
}
func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
- b.updateRequestLatencyMetrics(requestLatency)
+ b.updateRequestLatencyAndInFlightMetrics(requestLatency)
b.responseRate.Mark(1)
if b.brokerResponseRate != nil {
@@ -1296,7 +1316,7 @@
}
}
-func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
+func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
requestLatencyInMs := int64(requestLatency / time.Millisecond)
b.requestLatency.Update(requestLatencyInMs)
@@ -1304,6 +1324,14 @@
b.brokerRequestLatency.Update(requestLatencyInMs)
}
+ b.addRequestInFlightMetrics(-1)
+}
+
+func (b *Broker) addRequestInFlightMetrics(i int64) {
+ b.requestsInFlight.Inc(i)
+ if b.brokerRequestsInFlight != nil {
+ b.brokerRequestsInFlight.Inc(i)
+ }
}
func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1322,7 +1350,6 @@
if b.brokerRequestSize != nil {
b.brokerRequestSize.Update(requestSize)
}
-
}
func (b *Broker) registerMetrics() {
@@ -1333,6 +1360,7 @@
b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
b.brokerResponseRate = b.registerMeter("response-rate")
b.brokerResponseSize = b.registerHistogram("response-size")
+ b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
}
func (b *Broker) unregisterMetrics() {
@@ -1352,3 +1380,9 @@
b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
}
+
+func (b *Broker) registerCounter(name string) metrics.Counter {
+ nameForBroker := getMetricNameForBroker(name, b)
+ b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+ return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
+}