William Kurkian | ea86948 | 2019-04-09 15:16:11 -0400 | [diff] [blame^] | 1 | package sarama |
| 2 | |
| 3 | import ( |
| 4 | "net" |
| 5 | "strconv" |
| 6 | ) |
| 7 | |
| 8 | //ConsumerMetadataResponse holds the response for a consumer group meta data requests |
| 9 | type ConsumerMetadataResponse struct { |
| 10 | Err KError |
| 11 | Coordinator *Broker |
| 12 | CoordinatorID int32 // deprecated: use Coordinator.ID() |
| 13 | CoordinatorHost string // deprecated: use Coordinator.Addr() |
| 14 | CoordinatorPort int32 // deprecated: use Coordinator.Addr() |
| 15 | } |
| 16 | |
| 17 | func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) { |
| 18 | tmp := new(FindCoordinatorResponse) |
| 19 | |
| 20 | if err := tmp.decode(pd, version); err != nil { |
| 21 | return err |
| 22 | } |
| 23 | |
| 24 | r.Err = tmp.Err |
| 25 | |
| 26 | r.Coordinator = tmp.Coordinator |
| 27 | if tmp.Coordinator == nil { |
| 28 | return nil |
| 29 | } |
| 30 | |
| 31 | // this can all go away in 2.0, but we have to fill in deprecated fields to maintain |
| 32 | // backwards compatibility |
| 33 | host, portstr, err := net.SplitHostPort(r.Coordinator.Addr()) |
| 34 | if err != nil { |
| 35 | return err |
| 36 | } |
| 37 | port, err := strconv.ParseInt(portstr, 10, 32) |
| 38 | if err != nil { |
| 39 | return err |
| 40 | } |
| 41 | r.CoordinatorID = r.Coordinator.ID() |
| 42 | r.CoordinatorHost = host |
| 43 | r.CoordinatorPort = int32(port) |
| 44 | |
| 45 | return nil |
| 46 | } |
| 47 | |
| 48 | func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error { |
| 49 | if r.Coordinator == nil { |
| 50 | r.Coordinator = new(Broker) |
| 51 | r.Coordinator.id = r.CoordinatorID |
| 52 | r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort))) |
| 53 | } |
| 54 | |
| 55 | tmp := &FindCoordinatorResponse{ |
| 56 | Version: 0, |
| 57 | Err: r.Err, |
| 58 | Coordinator: r.Coordinator, |
| 59 | } |
| 60 | |
| 61 | if err := tmp.encode(pe); err != nil { |
| 62 | return err |
| 63 | } |
| 64 | |
| 65 | return nil |
| 66 | } |
| 67 | |
| 68 | func (r *ConsumerMetadataResponse) key() int16 { |
| 69 | return 10 |
| 70 | } |
| 71 | |
| 72 | func (r *ConsumerMetadataResponse) version() int16 { |
| 73 | return 0 |
| 74 | } |
| 75 | |
| 76 | func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion { |
| 77 | return V0_8_2_0 |
| 78 | } |