gRPC migration update

Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/vendor/github.com/Shopify/sarama/balance_strategy.go b/vendor/github.com/Shopify/sarama/balance_strategy.go
index 67c4d96..9855bf4 100644
--- a/vendor/github.com/Shopify/sarama/balance_strategy.go
+++ b/vendor/github.com/Shopify/sarama/balance_strategy.go
@@ -2,6 +2,8 @@
 
 import (
 	"container/heap"
+	"errors"
+	"fmt"
 	"math"
 	"sort"
 	"strings"
@@ -47,6 +49,10 @@
 	// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
 	// and returns a distribution plan.
 	Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
+
+	// AssignmentData returns the serialized assignment data for the specified
+	// memberID
+	AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
 }
 
 // --------------------------------------------------------------------
@@ -69,20 +75,6 @@
 	},
 }
 
-// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
-// Example with topic T with six partitions (0..5) and two members (M1, M2):
-//   M1: {T: [0, 2, 4]}
-//   M2: {T: [1, 3, 5]}
-var BalanceStrategyRoundRobin = &balanceStrategy{
-	name: RoundRobinBalanceStrategyName,
-	coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
-		for i, part := range partitions {
-			memberID := memberIDs[i%len(memberIDs)]
-			plan.Add(memberID, topic, part)
-		}
-	},
-}
-
 // BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
 // while maintain a balanced partition distribution.
 // Example with topic T with six partitions (0..5) and two members (M1, M2):
@@ -132,6 +124,11 @@
 	return plan, nil
 }
 
+// AssignmentData simple strategies do not require any shared assignment data
+func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
+	return nil, nil
+}
+
 type balanceStrategySortable struct {
 	topic     string
 	memberIDs []string
@@ -141,6 +138,7 @@
 func (p balanceStrategySortable) Swap(i, j int) {
 	p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
 }
+
 func (p balanceStrategySortable) Less(i, j int) bool {
 	return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
 }
@@ -258,7 +256,7 @@
 	plan := make(BalanceStrategyPlan, len(currentAssignment))
 	for memberID, assignments := range currentAssignment {
 		if len(assignments) == 0 {
-			plan[memberID] = make(map[string][]int32, 0)
+			plan[memberID] = make(map[string][]int32)
 		} else {
 			for _, assignment := range assignments {
 				plan.Add(memberID, assignment.Topic, assignment.Partition)
@@ -268,6 +266,15 @@
 	return plan, nil
 }
 
+// AssignmentData serializes the set of topics currently assigned to the
+// specified member as part of the supplied balance plan
+func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
+	return encode(&StickyAssignorUserDataV1{
+		Topics:     topics,
+		Generation: generationID,
+	}, nil)
+}
+
 func strsContains(s []string, value string) bool {
 	for _, entry := range s {
 		if entry == value {
@@ -335,6 +342,92 @@
 	}
 }
 
+// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
+// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
+// M0: [t0p0, t0p2, t1p1]
+// M1: [t0p1, t1p0, t1p2]
+var BalanceStrategyRoundRobin = new(roundRobinBalancer)
+
+type roundRobinBalancer struct{}
+
+func (b *roundRobinBalancer) Name() string {
+	return RoundRobinBalanceStrategyName
+}
+
+func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
+	if len(memberAndMetadata) == 0 || len(topics) == 0 {
+		return nil, errors.New("members and topics are not provided")
+	}
+	// sort partitions
+	var topicPartitions []topicAndPartition
+	for topic, partitions := range topics {
+		for _, partition := range partitions {
+			topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
+		}
+	}
+	sort.SliceStable(topicPartitions, func(i, j int) bool {
+		pi := topicPartitions[i]
+		pj := topicPartitions[j]
+		return pi.comparedValue() < pj.comparedValue()
+	})
+
+	// sort members
+	var members []memberAndTopic
+	for memberID, meta := range memberAndMetadata {
+		m := memberAndTopic{
+			memberID: memberID,
+			topics:   make(map[string]struct{}),
+		}
+		for _, t := range meta.Topics {
+			m.topics[t] = struct{}{}
+		}
+		members = append(members, m)
+	}
+	sort.SliceStable(members, func(i, j int) bool {
+		mi := members[i]
+		mj := members[j]
+		return mi.memberID < mj.memberID
+	})
+
+	// assign partitions
+	plan := make(BalanceStrategyPlan, len(members))
+	i := 0
+	n := len(members)
+	for _, tp := range topicPartitions {
+		m := members[i%n]
+		for !m.hasTopic(tp.topic) {
+			i++
+			m = members[i%n]
+		}
+		plan.Add(m.memberID, tp.topic, tp.partition)
+		i++
+	}
+	return plan, nil
+}
+
+func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
+	return nil, nil // do nothing for now
+}
+
+type topicAndPartition struct {
+	topic     string
+	partition int32
+}
+
+func (tp *topicAndPartition) comparedValue() string {
+	return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
+}
+
+type memberAndTopic struct {
+	memberID string
+	topics   map[string]struct{}
+}
+
+func (m *memberAndTopic) hasTopic(topic string) bool {
+	_, isExist := m.topics[topic]
+	return isExist
+}
+
 // Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
 // A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
 // Lower balance score indicates a more balanced assignment.
@@ -355,8 +448,8 @@
 }
 
 // Determine whether the current assignment plan is balanced.
-func isBalanced(currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, allSubscriptions map[string][]topicPartitionAssignment) bool {
-	sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
+func isBalanced(currentAssignment map[string][]topicPartitionAssignment, allSubscriptions map[string][]topicPartitionAssignment) bool {
+	sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
 	min := len(currentAssignment[sortedCurrentSubscriptions[0]])
 	max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
 	if min >= max-1 {
@@ -412,7 +505,7 @@
 		// reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
 		// until the full list is processed or a balance is achieved
 		for _, partition := range reassignablePartitions {
-			if isBalanced(currentAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions) {
+			if isBalanced(currentAssignment, consumer2AllPotentialPartitions) {
 				break
 			}
 
@@ -671,20 +764,12 @@
 	return sortedPartionIDs
 }
 
-func deepCopyPartitions(src []topicPartitionAssignment) []topicPartitionAssignment {
-	dst := make([]topicPartitionAssignment, len(src))
-	for i, partition := range src {
-		dst[i] = partition
-	}
-	return dst
-}
-
 func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
-	copy := make(map[string][]topicPartitionAssignment, len(assignment))
+	m := make(map[string][]topicPartitionAssignment, len(assignment))
 	for memberID, subscriptions := range assignment {
-		copy[memberID] = append(subscriptions[:0:0], subscriptions...)
+		m[memberID] = append(subscriptions[:0:0], subscriptions...)
 	}
-	return copy
+	return m
 }
 
 func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
@@ -938,9 +1023,7 @@
 	for i := 0; i < len(cycle)-1; i++ {
 		superCycle[i] = cycle[i]
 	}
-	for _, c := range cycle {
-		superCycle = append(superCycle, c)
-	}
+	superCycle = append(superCycle, cycle...)
 	for _, foundCycle := range cycles {
 		if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
 			return true