| package sarama |
| |
| type TxnOffsetCommitRequest struct { |
| TransactionalID string |
| GroupID string |
| ProducerID int64 |
| ProducerEpoch int16 |
| Topics map[string][]*PartitionOffsetMetadata |
| } |
| |
| func (t *TxnOffsetCommitRequest) encode(pe packetEncoder) error { |
| if err := pe.putString(t.TransactionalID); err != nil { |
| return err |
| } |
| if err := pe.putString(t.GroupID); err != nil { |
| return err |
| } |
| pe.putInt64(t.ProducerID) |
| pe.putInt16(t.ProducerEpoch) |
| |
| if err := pe.putArrayLength(len(t.Topics)); err != nil { |
| return err |
| } |
| for topic, partitions := range t.Topics { |
| if err := pe.putString(topic); err != nil { |
| return err |
| } |
| if err := pe.putArrayLength(len(partitions)); err != nil { |
| return err |
| } |
| for _, partition := range partitions { |
| if err := partition.encode(pe); err != nil { |
| return err |
| } |
| } |
| } |
| |
| return nil |
| } |
| |
| func (t *TxnOffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) { |
| if t.TransactionalID, err = pd.getString(); err != nil { |
| return err |
| } |
| if t.GroupID, err = pd.getString(); err != nil { |
| return err |
| } |
| if t.ProducerID, err = pd.getInt64(); err != nil { |
| return err |
| } |
| if t.ProducerEpoch, err = pd.getInt16(); err != nil { |
| return err |
| } |
| |
| n, err := pd.getArrayLength() |
| if err != nil { |
| return err |
| } |
| |
| t.Topics = make(map[string][]*PartitionOffsetMetadata) |
| for i := 0; i < n; i++ { |
| topic, err := pd.getString() |
| if err != nil { |
| return err |
| } |
| |
| m, err := pd.getArrayLength() |
| if err != nil { |
| return err |
| } |
| |
| t.Topics[topic] = make([]*PartitionOffsetMetadata, m) |
| |
| for j := 0; j < m; j++ { |
| partitionOffsetMetadata := new(PartitionOffsetMetadata) |
| if err := partitionOffsetMetadata.decode(pd, version); err != nil { |
| return err |
| } |
| t.Topics[topic][j] = partitionOffsetMetadata |
| } |
| } |
| |
| return nil |
| } |
| |
| func (a *TxnOffsetCommitRequest) key() int16 { |
| return 28 |
| } |
| |
| func (a *TxnOffsetCommitRequest) version() int16 { |
| return 0 |
| } |
| |
| func (a *TxnOffsetCommitRequest) requiredVersion() KafkaVersion { |
| return V0_11_0_0 |
| } |
| |
| type PartitionOffsetMetadata struct { |
| Partition int32 |
| Offset int64 |
| Metadata *string |
| } |
| |
| func (p *PartitionOffsetMetadata) encode(pe packetEncoder) error { |
| pe.putInt32(p.Partition) |
| pe.putInt64(p.Offset) |
| if err := pe.putNullableString(p.Metadata); err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (p *PartitionOffsetMetadata) decode(pd packetDecoder, version int16) (err error) { |
| if p.Partition, err = pd.getInt32(); err != nil { |
| return err |
| } |
| if p.Offset, err = pd.getInt64(); err != nil { |
| return err |
| } |
| if p.Metadata, err = pd.getNullableString(); err != nil { |
| return err |
| } |
| |
| return nil |
| } |