Takahiro Suzuki | 241c10e | 2020-12-17 20:17:57 +0900 | [diff] [blame] | 1 | package cluster |
| 2 | |
| 3 | import ( |
| 4 | "regexp" |
| 5 | "time" |
| 6 | |
| 7 | "github.com/Shopify/sarama" |
| 8 | ) |
| 9 | |
| 10 | var minVersion = sarama.V0_9_0_0 |
| 11 | |
| 12 | type ConsumerMode uint8 |
| 13 | |
| 14 | const ( |
| 15 | ConsumerModeMultiplex ConsumerMode = iota |
| 16 | ConsumerModePartitions |
| 17 | ) |
| 18 | |
| 19 | // Config extends sarama.Config with Group specific namespace |
| 20 | type Config struct { |
| 21 | sarama.Config |
| 22 | |
| 23 | // Group is the namespace for group management properties |
| 24 | Group struct { |
| 25 | |
| 26 | // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange) |
| 27 | PartitionStrategy Strategy |
| 28 | |
| 29 | // By default, messages and errors from the subscribed topics and partitions are all multiplexed and |
| 30 | // made available through the consumer's Messages() and Errors() channels. |
| 31 | // |
| 32 | // Users who require low-level access can enable ConsumerModePartitions where individual partitions |
| 33 | // are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions |
| 34 | // themselves. |
| 35 | Mode ConsumerMode |
| 36 | |
| 37 | Offsets struct { |
| 38 | Retry struct { |
| 39 | // The numer retries when committing offsets (defaults to 3). |
| 40 | Max int |
| 41 | } |
| 42 | Synchronization struct { |
| 43 | // The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance |
| 44 | // NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration |
| 45 | DwellTime time.Duration |
| 46 | } |
| 47 | } |
| 48 | |
| 49 | Session struct { |
| 50 | // The allowed session timeout for registered consumers (defaults to 30s). |
| 51 | // Must be within the allowed server range. |
| 52 | Timeout time.Duration |
| 53 | } |
| 54 | |
| 55 | Heartbeat struct { |
| 56 | // Interval between each heartbeat (defaults to 3s). It should be no more |
| 57 | // than 1/3rd of the Group.Session.Timout setting |
| 58 | Interval time.Duration |
| 59 | } |
| 60 | |
| 61 | // Return specifies which group channels will be populated. If they are set to true, |
| 62 | // you must read from the respective channels to prevent deadlock. |
| 63 | Return struct { |
| 64 | // If enabled, rebalance notification will be returned on the |
| 65 | // Notifications channel (default disabled). |
| 66 | Notifications bool |
| 67 | } |
| 68 | |
| 69 | Topics struct { |
| 70 | // An additional whitelist of topics to subscribe to. |
| 71 | Whitelist *regexp.Regexp |
| 72 | // An additional blacklist of topics to avoid. If set, this will precede over |
| 73 | // the Whitelist setting. |
| 74 | Blacklist *regexp.Regexp |
| 75 | } |
| 76 | |
| 77 | Member struct { |
| 78 | // Custom metadata to include when joining the group. The user data for all joined members |
| 79 | // can be retrieved by sending a DescribeGroupRequest to the broker that is the |
| 80 | // coordinator for the group. |
| 81 | UserData []byte |
| 82 | } |
| 83 | } |
| 84 | } |
| 85 | |
| 86 | // NewConfig returns a new configuration instance with sane defaults. |
| 87 | func NewConfig() *Config { |
| 88 | c := &Config{ |
| 89 | Config: *sarama.NewConfig(), |
| 90 | } |
| 91 | c.Group.PartitionStrategy = StrategyRange |
| 92 | c.Group.Offsets.Retry.Max = 3 |
| 93 | c.Group.Offsets.Synchronization.DwellTime = c.Consumer.MaxProcessingTime |
| 94 | c.Group.Session.Timeout = 30 * time.Second |
| 95 | c.Group.Heartbeat.Interval = 3 * time.Second |
| 96 | c.Config.Version = minVersion |
| 97 | return c |
| 98 | } |
| 99 | |
| 100 | // Validate checks a Config instance. It will return a |
| 101 | // sarama.ConfigurationError if the specified values don't make sense. |
| 102 | func (c *Config) Validate() error { |
| 103 | if c.Group.Heartbeat.Interval%time.Millisecond != 0 { |
| 104 | sarama.Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.") |
| 105 | } |
| 106 | if c.Group.Session.Timeout%time.Millisecond != 0 { |
| 107 | sarama.Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.") |
| 108 | } |
| 109 | if c.Group.PartitionStrategy != StrategyRange && c.Group.PartitionStrategy != StrategyRoundRobin { |
| 110 | sarama.Logger.Println("Group.PartitionStrategy is not supported; range will be assumed.") |
| 111 | } |
| 112 | if !c.Version.IsAtLeast(minVersion) { |
| 113 | sarama.Logger.Println("Version is not supported; 0.9. will be assumed.") |
| 114 | c.Version = minVersion |
| 115 | } |
| 116 | if err := c.Config.Validate(); err != nil { |
| 117 | return err |
| 118 | } |
| 119 | |
| 120 | // validate the Group values |
| 121 | switch { |
| 122 | case c.Group.Offsets.Retry.Max < 0: |
| 123 | return sarama.ConfigurationError("Group.Offsets.Retry.Max must be >= 0") |
| 124 | case c.Group.Offsets.Synchronization.DwellTime <= 0: |
| 125 | return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be > 0") |
| 126 | case c.Group.Offsets.Synchronization.DwellTime > 10*time.Minute: |
| 127 | return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be <= 10m") |
| 128 | case c.Group.Heartbeat.Interval <= 0: |
| 129 | return sarama.ConfigurationError("Group.Heartbeat.Interval must be > 0") |
| 130 | case c.Group.Session.Timeout <= 0: |
| 131 | return sarama.ConfigurationError("Group.Session.Timeout must be > 0") |
| 132 | case !c.Metadata.Full && c.Group.Topics.Whitelist != nil: |
| 133 | return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Whitelist is used") |
| 134 | case !c.Metadata.Full && c.Group.Topics.Blacklist != nil: |
| 135 | return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Blacklist is used") |
| 136 | } |
| 137 | |
| 138 | // ensure offset is correct |
| 139 | switch c.Consumer.Offsets.Initial { |
| 140 | case sarama.OffsetOldest, sarama.OffsetNewest: |
| 141 | default: |
| 142 | return sarama.ConfigurationError("Consumer.Offsets.Initial must be either OffsetOldest or OffsetNewest") |
| 143 | } |
| 144 | |
| 145 | return nil |
| 146 | } |