blob: c4043a33520d8a71a74b86c654a9b079a0466ef6 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3type TxnOffsetCommitRequest struct {
4 TransactionalID string
5 GroupID string
6 ProducerID int64
7 ProducerEpoch int16
8 Topics map[string][]*PartitionOffsetMetadata
9}
10
11func (t *TxnOffsetCommitRequest) encode(pe packetEncoder) error {
12 if err := pe.putString(t.TransactionalID); err != nil {
13 return err
14 }
15 if err := pe.putString(t.GroupID); err != nil {
16 return err
17 }
18 pe.putInt64(t.ProducerID)
19 pe.putInt16(t.ProducerEpoch)
20
21 if err := pe.putArrayLength(len(t.Topics)); err != nil {
22 return err
23 }
24 for topic, partitions := range t.Topics {
25 if err := pe.putString(topic); err != nil {
26 return err
27 }
28 if err := pe.putArrayLength(len(partitions)); err != nil {
29 return err
30 }
31 for _, partition := range partitions {
32 if err := partition.encode(pe); err != nil {
33 return err
34 }
35 }
36 }
37
38 return nil
39}
40
41func (t *TxnOffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) {
42 if t.TransactionalID, err = pd.getString(); err != nil {
43 return err
44 }
45 if t.GroupID, err = pd.getString(); err != nil {
46 return err
47 }
48 if t.ProducerID, err = pd.getInt64(); err != nil {
49 return err
50 }
51 if t.ProducerEpoch, err = pd.getInt16(); err != nil {
52 return err
53 }
54
55 n, err := pd.getArrayLength()
56 if err != nil {
57 return err
58 }
59
60 t.Topics = make(map[string][]*PartitionOffsetMetadata)
61 for i := 0; i < n; i++ {
62 topic, err := pd.getString()
63 if err != nil {
64 return err
65 }
66
67 m, err := pd.getArrayLength()
68 if err != nil {
69 return err
70 }
71
72 t.Topics[topic] = make([]*PartitionOffsetMetadata, m)
73
74 for j := 0; j < m; j++ {
75 partitionOffsetMetadata := new(PartitionOffsetMetadata)
76 if err := partitionOffsetMetadata.decode(pd, version); err != nil {
77 return err
78 }
79 t.Topics[topic][j] = partitionOffsetMetadata
80 }
81 }
82
83 return nil
84}
85
86func (a *TxnOffsetCommitRequest) key() int16 {
87 return 28
88}
89
90func (a *TxnOffsetCommitRequest) version() int16 {
91 return 0
92}
93
khenaidood948f772021-08-11 17:49:24 -040094func (a *TxnOffsetCommitRequest) headerVersion() int16 {
95 return 1
96}
97
khenaidooac637102019-01-14 15:44:34 -050098func (a *TxnOffsetCommitRequest) requiredVersion() KafkaVersion {
99 return V0_11_0_0
100}
101
102type PartitionOffsetMetadata struct {
103 Partition int32
104 Offset int64
105 Metadata *string
106}
107
108func (p *PartitionOffsetMetadata) encode(pe packetEncoder) error {
109 pe.putInt32(p.Partition)
110 pe.putInt64(p.Offset)
111 if err := pe.putNullableString(p.Metadata); err != nil {
112 return err
113 }
114
115 return nil
116}
117
118func (p *PartitionOffsetMetadata) decode(pd packetDecoder, version int16) (err error) {
119 if p.Partition, err = pd.getInt32(); err != nil {
120 return err
121 }
122 if p.Offset, err = pd.getInt64(); err != nil {
123 return err
124 }
125 if p.Metadata, err = pd.getNullableString(); err != nil {
126 return err
127 }
128
129 return nil
130}