blob: f39a8711cbb998d8b754cdabecba814d709cc2dd [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "net"
5 "strconv"
6)
7
8//ConsumerMetadataResponse holds the response for a consumer group meta data requests
9type ConsumerMetadataResponse struct {
10 Err KError
11 Coordinator *Broker
12 CoordinatorID int32 // deprecated: use Coordinator.ID()
13 CoordinatorHost string // deprecated: use Coordinator.Addr()
14 CoordinatorPort int32 // deprecated: use Coordinator.Addr()
15}
16
17func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
18 tmp := new(FindCoordinatorResponse)
19
20 if err := tmp.decode(pd, version); err != nil {
21 return err
22 }
23
24 r.Err = tmp.Err
25
26 r.Coordinator = tmp.Coordinator
27 if tmp.Coordinator == nil {
28 return nil
29 }
30
31 // this can all go away in 2.0, but we have to fill in deprecated fields to maintain
32 // backwards compatibility
33 host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
34 if err != nil {
35 return err
36 }
37 port, err := strconv.ParseInt(portstr, 10, 32)
38 if err != nil {
39 return err
40 }
41 r.CoordinatorID = r.Coordinator.ID()
42 r.CoordinatorHost = host
43 r.CoordinatorPort = int32(port)
44
45 return nil
46}
47
48func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
49 if r.Coordinator == nil {
50 r.Coordinator = new(Broker)
51 r.Coordinator.id = r.CoordinatorID
52 r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
53 }
54
55 tmp := &FindCoordinatorResponse{
56 Version: 0,
57 Err: r.Err,
58 Coordinator: r.Coordinator,
59 }
60
61 if err := tmp.encode(pe); err != nil {
62 return err
63 }
64
65 return nil
66}
67
68func (r *ConsumerMetadataResponse) key() int16 {
69 return 10
70}
71
72func (r *ConsumerMetadataResponse) version() int16 {
73 return 0
74}
75
76func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
77 return V0_8_2_0
78}