blob: 42ffb30c01a99b15fb8073a2b04ffcb588a2151b [file] [log] [blame]
Scott Bakereee8dd82019-09-24 12:52:34 -07001package cluster
2
3import (
4 "errors"
5 "sync/atomic"
6
7 "github.com/Shopify/sarama"
8)
9
10var errClientInUse = errors.New("cluster: client is already used by another consumer")
11
12// Client is a group client
13type Client struct {
14 sarama.Client
15 config Config
16
17 inUse uint32
18}
19
20// NewClient creates a new client instance
21func NewClient(addrs []string, config *Config) (*Client, error) {
22 if config == nil {
23 config = NewConfig()
24 }
25
26 if err := config.Validate(); err != nil {
27 return nil, err
28 }
29
30 client, err := sarama.NewClient(addrs, &config.Config)
31 if err != nil {
32 return nil, err
33 }
34
35 return &Client{Client: client, config: *config}, nil
36}
37
38// ClusterConfig returns the cluster configuration.
39func (c *Client) ClusterConfig() *Config {
40 cfg := c.config
41 return &cfg
42}
43
44func (c *Client) claim() bool {
45 return atomic.CompareAndSwapUint32(&c.inUse, 0, 1)
46}
47
48func (c *Client) release() {
49 atomic.CompareAndSwapUint32(&c.inUse, 1, 0)
50}