| package sarama |
| |
| import ( |
| "net" |
| "strconv" |
| ) |
| |
| //ConsumerMetadataResponse holds the response for a consumer group meta data requests |
| type ConsumerMetadataResponse struct { |
| Err KError |
| Coordinator *Broker |
| CoordinatorID int32 // deprecated: use Coordinator.ID() |
| CoordinatorHost string // deprecated: use Coordinator.Addr() |
| CoordinatorPort int32 // deprecated: use Coordinator.Addr() |
| } |
| |
| func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) { |
| tmp := new(FindCoordinatorResponse) |
| |
| if err := tmp.decode(pd, version); err != nil { |
| return err |
| } |
| |
| r.Err = tmp.Err |
| |
| r.Coordinator = tmp.Coordinator |
| if tmp.Coordinator == nil { |
| return nil |
| } |
| |
| // this can all go away in 2.0, but we have to fill in deprecated fields to maintain |
| // backwards compatibility |
| host, portstr, err := net.SplitHostPort(r.Coordinator.Addr()) |
| if err != nil { |
| return err |
| } |
| port, err := strconv.ParseInt(portstr, 10, 32) |
| if err != nil { |
| return err |
| } |
| r.CoordinatorID = r.Coordinator.ID() |
| r.CoordinatorHost = host |
| r.CoordinatorPort = int32(port) |
| |
| return nil |
| } |
| |
| func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error { |
| if r.Coordinator == nil { |
| r.Coordinator = new(Broker) |
| r.Coordinator.id = r.CoordinatorID |
| r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort))) |
| } |
| |
| tmp := &FindCoordinatorResponse{ |
| Version: 0, |
| Err: r.Err, |
| Coordinator: r.Coordinator, |
| } |
| |
| if err := tmp.encode(pe); err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (r *ConsumerMetadataResponse) key() int16 { |
| return 10 |
| } |
| |
| func (r *ConsumerMetadataResponse) version() int16 { |
| return 0 |
| } |
| |
| func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion { |
| return V0_8_2_0 |
| } |