blob: 7fe0cf9716de015b120a465119915e577293c44d [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "net"
5 "strconv"
6)
7
khenaidood948f772021-08-11 17:49:24 -04008// ConsumerMetadataResponse holds the response for a consumer group meta data requests
khenaidooac637102019-01-14 15:44:34 -05009type ConsumerMetadataResponse struct {
10 Err KError
11 Coordinator *Broker
12 CoordinatorID int32 // deprecated: use Coordinator.ID()
13 CoordinatorHost string // deprecated: use Coordinator.Addr()
14 CoordinatorPort int32 // deprecated: use Coordinator.Addr()
15}
16
17func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
18 tmp := new(FindCoordinatorResponse)
19
20 if err := tmp.decode(pd, version); err != nil {
21 return err
22 }
23
24 r.Err = tmp.Err
25
26 r.Coordinator = tmp.Coordinator
27 if tmp.Coordinator == nil {
28 return nil
29 }
30
31 // this can all go away in 2.0, but we have to fill in deprecated fields to maintain
32 // backwards compatibility
33 host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
34 if err != nil {
35 return err
36 }
37 port, err := strconv.ParseInt(portstr, 10, 32)
38 if err != nil {
39 return err
40 }
41 r.CoordinatorID = r.Coordinator.ID()
42 r.CoordinatorHost = host
43 r.CoordinatorPort = int32(port)
44
45 return nil
46}
47
48func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
49 if r.Coordinator == nil {
50 r.Coordinator = new(Broker)
51 r.Coordinator.id = r.CoordinatorID
52 r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
53 }
54
55 tmp := &FindCoordinatorResponse{
56 Version: 0,
57 Err: r.Err,
58 Coordinator: r.Coordinator,
59 }
60
61 if err := tmp.encode(pe); err != nil {
62 return err
63 }
64
65 return nil
66}
67
68func (r *ConsumerMetadataResponse) key() int16 {
69 return 10
70}
71
72func (r *ConsumerMetadataResponse) version() int16 {
73 return 0
74}
75
khenaidood948f772021-08-11 17:49:24 -040076func (r *ConsumerMetadataResponse) headerVersion() int16 {
77 return 0
78}
79
khenaidooac637102019-01-14 15:44:34 -050080func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
81 return V0_8_2_0
82}