blob: eb4f23eca35df029b86f68fa6e25ffeb7b1e674e [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "time"
5)
6
Scott Baker8461e152019-10-01 14:44:30 -07007//AddPartitionsToTxnResponse is a partition errors to transaction type
khenaidooac637102019-01-14 15:44:34 -05008type AddPartitionsToTxnResponse struct {
9 ThrottleTime time.Duration
10 Errors map[string][]*PartitionError
11}
12
13func (a *AddPartitionsToTxnResponse) encode(pe packetEncoder) error {
14 pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
15 if err := pe.putArrayLength(len(a.Errors)); err != nil {
16 return err
17 }
18
19 for topic, e := range a.Errors {
20 if err := pe.putString(topic); err != nil {
21 return err
22 }
23 if err := pe.putArrayLength(len(e)); err != nil {
24 return err
25 }
26 for _, partitionError := range e {
27 if err := partitionError.encode(pe); err != nil {
28 return err
29 }
30 }
31 }
32
33 return nil
34}
35
36func (a *AddPartitionsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
37 throttleTime, err := pd.getInt32()
38 if err != nil {
39 return err
40 }
41 a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
42
43 n, err := pd.getArrayLength()
44 if err != nil {
45 return err
46 }
47
48 a.Errors = make(map[string][]*PartitionError)
49
50 for i := 0; i < n; i++ {
51 topic, err := pd.getString()
52 if err != nil {
53 return err
54 }
55
56 m, err := pd.getArrayLength()
57 if err != nil {
58 return err
59 }
60
61 a.Errors[topic] = make([]*PartitionError, m)
62
63 for j := 0; j < m; j++ {
64 a.Errors[topic][j] = new(PartitionError)
65 if err := a.Errors[topic][j].decode(pd, version); err != nil {
66 return err
67 }
68 }
69 }
70
71 return nil
72}
73
74func (a *AddPartitionsToTxnResponse) key() int16 {
75 return 24
76}
77
78func (a *AddPartitionsToTxnResponse) version() int16 {
79 return 0
80}
81
82func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
83 return V0_11_0_0
84}
85
Scott Baker8461e152019-10-01 14:44:30 -070086//PartitionError is a partition error type
khenaidooac637102019-01-14 15:44:34 -050087type PartitionError struct {
88 Partition int32
89 Err KError
90}
91
92func (p *PartitionError) encode(pe packetEncoder) error {
93 pe.putInt32(p.Partition)
94 pe.putInt16(int16(p.Err))
95 return nil
96}
97
98func (p *PartitionError) decode(pd packetDecoder, version int16) (err error) {
99 if p.Partition, err = pd.getInt32(); err != nil {
100 return err
101 }
102
103 kerr, err := pd.getInt16()
104 if err != nil {
105 return err
106 }
107 p.Err = KError(kerr)
108
109 return nil
110}