SEBA-902 single-olt tests;
Pin protoc-gen-go to 1.3.2 to resolve compatibility issue;
Run go mod tidy / go mod vendor on importer;
Add Go Module support to demotest

Change-Id: Ifde824fc9a6317b0adc1e12bea54ee1f9b788906
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 8146749..d27ebd2 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -40,6 +40,7 @@
 	outgoingByteRate       metrics.Meter
 	responseRate           metrics.Meter
 	responseSize           metrics.Histogram
+	requestsInFlight       metrics.Counter
 	brokerIncomingByteRate metrics.Meter
 	brokerRequestRate      metrics.Meter
 	brokerRequestSize      metrics.Histogram
@@ -47,6 +48,7 @@
 	brokerOutgoingByteRate metrics.Meter
 	brokerResponseRate     metrics.Meter
 	brokerResponseSize     metrics.Histogram
+	brokerRequestsInFlight metrics.Counter
 
 	kerberosAuthenticator GSSAPIKerberosAuth
 }
@@ -182,6 +184,7 @@
 		b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
 		b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
 		b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+		b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
 		// Do not gather metrics for seeded broker (only used during bootstrap) because they share
 		// the same id (-1) and are already exposed through the global metrics above
 		if b.id >= 0 {
@@ -189,7 +192,6 @@
 		}
 
 		if conf.Net.SASL.Enable {
-
 			b.connErr = b.authenticateViaSASL()
 
 			if b.connErr != nil {
@@ -713,16 +715,19 @@
 	}
 
 	requestTime := time.Now()
+	// Will be decremented in responseReceiver (except error or request with NoResponse)
+	b.addRequestInFlightMetrics(1)
 	bytes, err := b.write(buf)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		return nil, err
 	}
 	b.correlationID++
 
 	if !promiseResponse {
 		// Record request latency without the response
-		b.updateRequestLatencyMetrics(time.Since(requestTime))
+		b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
 		return nil, nil
 	}
 
@@ -817,6 +822,9 @@
 
 	for response := range b.responses {
 		if dead != nil {
+			// This was previously incremented in send() and
+			// we are not calling updateIncomingCommunicationMetrics()
+			b.addRequestInFlightMetrics(-1)
 			response.errors <- dead
 			continue
 		}
@@ -892,9 +900,12 @@
 	}
 
 	requestTime := time.Now()
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	bytes, err := b.write(buf)
 	b.updateOutgoingCommunicationMetrics(bytes)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
 		return err
 	}
@@ -903,6 +914,7 @@
 	header := make([]byte, 8) // response header
 	_, err = b.readFull(header)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
 		return err
 	}
@@ -911,6 +923,7 @@
 	payload := make([]byte, length-4)
 	n, err := b.readFull(payload)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
 		return err
 	}
@@ -961,7 +974,6 @@
 	// default to V0 to allow for backward compatability when SASL is enabled
 	// but not the handshake
 	if b.conf.Net.SASL.Handshake {
-
 		handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
 		if handshakeErr != nil {
 			Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
@@ -977,16 +989,18 @@
 
 // sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
 func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
-
-	length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
+	length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
 	authBytes := make([]byte, length+4) //4 byte length header + auth data
 	binary.BigEndian.PutUint32(authBytes, uint32(length))
-	copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
+	copy(authBytes[4:], []byte(b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
 
 	requestTime := time.Now()
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	bytesWritten, err := b.write(authBytes)
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		return err
 	}
@@ -1011,11 +1025,13 @@
 
 	requestTime := time.Now()
 
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
-
 	b.updateOutgoingCommunicationMetrics(bytesWritten)
 
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 		return err
 	}
@@ -1068,16 +1084,18 @@
 // if the broker responds with a challenge, in which case the token is
 // rejected.
 func (b *Broker) sendClientMessage(message []byte) (bool, error) {
-
 	requestTime := time.Now()
+	// Will be decremented in updateIncomingCommunicationMetrics (except error)
+	b.addRequestInFlightMetrics(1)
 	correlationID := b.correlationID
 
 	bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
+	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	if err != nil {
+		b.addRequestInFlightMetrics(-1)
 		return false, err
 	}
 
-	b.updateOutgoingCommunicationMetrics(bytesWritten)
 	b.correlationID++
 
 	res := &SaslAuthenticateResponse{}
@@ -1108,22 +1126,25 @@
 	msg, err := scramClient.Step("")
 	if err != nil {
 		return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
-
 	}
 
 	for !scramClient.Done() {
 		requestTime := time.Now()
+		// Will be decremented in updateIncomingCommunicationMetrics (except error)
+		b.addRequestInFlightMetrics(1)
 		correlationID := b.correlationID
 		bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
+		b.updateOutgoingCommunicationMetrics(bytesWritten)
 		if err != nil {
+			b.addRequestInFlightMetrics(-1)
 			Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
 			return err
 		}
 
-		b.updateOutgoingCommunicationMetrics(bytesWritten)
 		b.correlationID++
 		challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
 		if err != nil {
+			b.addRequestInFlightMetrics(-1)
 			Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
 			return err
 		}
@@ -1216,7 +1237,7 @@
 }
 
 func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
-	authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
+	authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
 	rb := &SaslAuthenticateRequest{authBytes}
 	req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
 	buf, err := encode(req, b.conf.MetricRegistry)
@@ -1228,7 +1249,6 @@
 }
 
 func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
-
 	rb := &SaslAuthenticateRequest{initialResp}
 
 	req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
@@ -1277,7 +1297,7 @@
 }
 
 func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
-	b.updateRequestLatencyMetrics(requestLatency)
+	b.updateRequestLatencyAndInFlightMetrics(requestLatency)
 	b.responseRate.Mark(1)
 
 	if b.brokerResponseRate != nil {
@@ -1296,7 +1316,7 @@
 	}
 }
 
-func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
+func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
 	requestLatencyInMs := int64(requestLatency / time.Millisecond)
 	b.requestLatency.Update(requestLatencyInMs)
 
@@ -1304,6 +1324,14 @@
 		b.brokerRequestLatency.Update(requestLatencyInMs)
 	}
 
+	b.addRequestInFlightMetrics(-1)
+}
+
+func (b *Broker) addRequestInFlightMetrics(i int64) {
+	b.requestsInFlight.Inc(i)
+	if b.brokerRequestsInFlight != nil {
+		b.brokerRequestsInFlight.Inc(i)
+	}
 }
 
 func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1322,7 +1350,6 @@
 	if b.brokerRequestSize != nil {
 		b.brokerRequestSize.Update(requestSize)
 	}
-
 }
 
 func (b *Broker) registerMetrics() {
@@ -1333,6 +1360,7 @@
 	b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
 	b.brokerResponseRate = b.registerMeter("response-rate")
 	b.brokerResponseSize = b.registerHistogram("response-size")
+	b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
 }
 
 func (b *Broker) unregisterMetrics() {
@@ -1352,3 +1380,9 @@
 	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
 	return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
 }
+
+func (b *Broker) registerCounter(name string) metrics.Counter {
+	nameForBroker := getMetricNameForBroker(name, b)
+	b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+	return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
+}