VOL-1868 move simulated onu from voltha-go to voltha-simonu-adapter
Sourced from voltha-go commit 251a11c0ffe60512318a644cd6ce0dc4e12f4018
Change-Id: Iab179bc2f3dd772ed7f488d1c03d1a84ba75e874
diff --git a/vendor/github.com/bsm/sarama-cluster/balancer.go b/vendor/github.com/bsm/sarama-cluster/balancer.go
new file mode 100644
index 0000000..3aeaece
--- /dev/null
+++ b/vendor/github.com/bsm/sarama-cluster/balancer.go
@@ -0,0 +1,170 @@
+package cluster
+
+import (
+ "math"
+ "sort"
+
+ "github.com/Shopify/sarama"
+)
+
+// NotificationType defines the type of notification
+type NotificationType uint8
+
+// String describes the notification type
+func (t NotificationType) String() string {
+ switch t {
+ case RebalanceStart:
+ return "rebalance start"
+ case RebalanceOK:
+ return "rebalance OK"
+ case RebalanceError:
+ return "rebalance error"
+ }
+ return "unknown"
+}
+
+const (
+ UnknownNotification NotificationType = iota
+ RebalanceStart
+ RebalanceOK
+ RebalanceError
+)
+
+// Notification are state events emitted by the consumers on rebalance
+type Notification struct {
+ // Type exposes the notification type
+ Type NotificationType
+
+ // Claimed contains topic/partitions that were claimed by this rebalance cycle
+ Claimed map[string][]int32
+
+ // Released contains topic/partitions that were released as part of this rebalance cycle
+ Released map[string][]int32
+
+ // Current are topic/partitions that are currently claimed to the consumer
+ Current map[string][]int32
+}
+
+func newNotification(current map[string][]int32) *Notification {
+ return &Notification{
+ Type: RebalanceStart,
+ Current: current,
+ }
+}
+
+func (n *Notification) success(current map[string][]int32) *Notification {
+ o := &Notification{
+ Type: RebalanceOK,
+ Claimed: make(map[string][]int32),
+ Released: make(map[string][]int32),
+ Current: current,
+ }
+ for topic, partitions := range current {
+ o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic]))
+ }
+ for topic, partitions := range n.Current {
+ o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic]))
+ }
+ return o
+}
+
+// --------------------------------------------------------------------
+
+type topicInfo struct {
+ Partitions []int32
+ MemberIDs []string
+}
+
+func (info topicInfo) Perform(s Strategy) map[string][]int32 {
+ if s == StrategyRoundRobin {
+ return info.RoundRobin()
+ }
+ return info.Ranges()
+}
+
+func (info topicInfo) Ranges() map[string][]int32 {
+ sort.Strings(info.MemberIDs)
+
+ mlen := len(info.MemberIDs)
+ plen := len(info.Partitions)
+ res := make(map[string][]int32, mlen)
+
+ for pos, memberID := range info.MemberIDs {
+ n, i := float64(plen)/float64(mlen), float64(pos)
+ min := int(math.Floor(i*n + 0.5))
+ max := int(math.Floor((i+1)*n + 0.5))
+ sub := info.Partitions[min:max]
+ if len(sub) > 0 {
+ res[memberID] = sub
+ }
+ }
+ return res
+}
+
+func (info topicInfo) RoundRobin() map[string][]int32 {
+ sort.Strings(info.MemberIDs)
+
+ mlen := len(info.MemberIDs)
+ res := make(map[string][]int32, mlen)
+ for i, pnum := range info.Partitions {
+ memberID := info.MemberIDs[i%mlen]
+ res[memberID] = append(res[memberID], pnum)
+ }
+ return res
+}
+
+// --------------------------------------------------------------------
+
+type balancer struct {
+ client sarama.Client
+ topics map[string]topicInfo
+}
+
+func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
+ balancer := newBalancer(client)
+ for memberID, meta := range members {
+ for _, topic := range meta.Topics {
+ if err := balancer.Topic(topic, memberID); err != nil {
+ return nil, err
+ }
+ }
+ }
+ return balancer, nil
+}
+
+func newBalancer(client sarama.Client) *balancer {
+ return &balancer{
+ client: client,
+ topics: make(map[string]topicInfo),
+ }
+}
+
+func (r *balancer) Topic(name string, memberID string) error {
+ topic, ok := r.topics[name]
+ if !ok {
+ nums, err := r.client.Partitions(name)
+ if err != nil {
+ return err
+ }
+ topic = topicInfo{
+ Partitions: nums,
+ MemberIDs: make([]string, 0, 1),
+ }
+ }
+ topic.MemberIDs = append(topic.MemberIDs, memberID)
+ r.topics[name] = topic
+ return nil
+}
+
+func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
+ res := make(map[string]map[string][]int32, 1)
+ for topic, info := range r.topics {
+ for memberID, partitions := range info.Perform(s) {
+ if _, ok := res[memberID]; !ok {
+ res[memberID] = make(map[string][]int32, 1)
+ }
+ res[memberID][topic] = partitions
+ }
+ }
+ return res
+}