blob: 0989565076764ca05f8701eb3334b63b32db3f48 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "time"
5)
6
khenaidood948f772021-08-11 17:49:24 -04007// AddPartitionsToTxnResponse is a partition errors to transaction type
khenaidooac637102019-01-14 15:44:34 -05008type AddPartitionsToTxnResponse struct {
9 ThrottleTime time.Duration
10 Errors map[string][]*PartitionError
11}
12
13func (a *AddPartitionsToTxnResponse) encode(pe packetEncoder) error {
14 pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
15 if err := pe.putArrayLength(len(a.Errors)); err != nil {
16 return err
17 }
18
19 for topic, e := range a.Errors {
20 if err := pe.putString(topic); err != nil {
21 return err
22 }
23 if err := pe.putArrayLength(len(e)); err != nil {
24 return err
25 }
26 for _, partitionError := range e {
27 if err := partitionError.encode(pe); err != nil {
28 return err
29 }
30 }
31 }
32
33 return nil
34}
35
36func (a *AddPartitionsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
37 throttleTime, err := pd.getInt32()
38 if err != nil {
39 return err
40 }
41 a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
42
43 n, err := pd.getArrayLength()
44 if err != nil {
45 return err
46 }
47
48 a.Errors = make(map[string][]*PartitionError)
49
50 for i := 0; i < n; i++ {
51 topic, err := pd.getString()
52 if err != nil {
53 return err
54 }
55
56 m, err := pd.getArrayLength()
57 if err != nil {
58 return err
59 }
60
61 a.Errors[topic] = make([]*PartitionError, m)
62
63 for j := 0; j < m; j++ {
64 a.Errors[topic][j] = new(PartitionError)
65 if err := a.Errors[topic][j].decode(pd, version); err != nil {
66 return err
67 }
68 }
69 }
70
71 return nil
72}
73
74func (a *AddPartitionsToTxnResponse) key() int16 {
75 return 24
76}
77
78func (a *AddPartitionsToTxnResponse) version() int16 {
79 return 0
80}
81
khenaidood948f772021-08-11 17:49:24 -040082func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
83 return 0
84}
85
khenaidooac637102019-01-14 15:44:34 -050086func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
87 return V0_11_0_0
88}
89
khenaidood948f772021-08-11 17:49:24 -040090// PartitionError is a partition error type
khenaidooac637102019-01-14 15:44:34 -050091type PartitionError struct {
92 Partition int32
93 Err KError
94}
95
96func (p *PartitionError) encode(pe packetEncoder) error {
97 pe.putInt32(p.Partition)
98 pe.putInt16(int16(p.Err))
99 return nil
100}
101
102func (p *PartitionError) decode(pd packetDecoder, version int16) (err error) {
103 if p.Partition, err = pd.getInt32(); err != nil {
104 return err
105 }
106
107 kerr, err := pd.getInt16()
108 if err != nil {
109 return err
110 }
111 p.Err = KError(kerr)
112
113 return nil
114}