blob: 581c556c5ce6cd596e697e74c9b83d4ddb39fc8f [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "time"
5)
6
7type AddPartitionsToTxnResponse struct {
8 ThrottleTime time.Duration
9 Errors map[string][]*PartitionError
10}
11
12func (a *AddPartitionsToTxnResponse) encode(pe packetEncoder) error {
13 pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
14 if err := pe.putArrayLength(len(a.Errors)); err != nil {
15 return err
16 }
17
18 for topic, e := range a.Errors {
19 if err := pe.putString(topic); err != nil {
20 return err
21 }
22 if err := pe.putArrayLength(len(e)); err != nil {
23 return err
24 }
25 for _, partitionError := range e {
26 if err := partitionError.encode(pe); err != nil {
27 return err
28 }
29 }
30 }
31
32 return nil
33}
34
35func (a *AddPartitionsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
36 throttleTime, err := pd.getInt32()
37 if err != nil {
38 return err
39 }
40 a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
41
42 n, err := pd.getArrayLength()
43 if err != nil {
44 return err
45 }
46
47 a.Errors = make(map[string][]*PartitionError)
48
49 for i := 0; i < n; i++ {
50 topic, err := pd.getString()
51 if err != nil {
52 return err
53 }
54
55 m, err := pd.getArrayLength()
56 if err != nil {
57 return err
58 }
59
60 a.Errors[topic] = make([]*PartitionError, m)
61
62 for j := 0; j < m; j++ {
63 a.Errors[topic][j] = new(PartitionError)
64 if err := a.Errors[topic][j].decode(pd, version); err != nil {
65 return err
66 }
67 }
68 }
69
70 return nil
71}
72
73func (a *AddPartitionsToTxnResponse) key() int16 {
74 return 24
75}
76
77func (a *AddPartitionsToTxnResponse) version() int16 {
78 return 0
79}
80
81func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
82 return V0_11_0_0
83}
84
85type PartitionError struct {
86 Partition int32
87 Err KError
88}
89
90func (p *PartitionError) encode(pe packetEncoder) error {
91 pe.putInt32(p.Partition)
92 pe.putInt16(int16(p.Err))
93 return nil
94}
95
96func (p *PartitionError) decode(pd packetDecoder, version int16) (err error) {
97 if p.Partition, err = pd.getInt32(); err != nil {
98 return err
99 }
100
101 kerr, err := pd.getInt16()
102 if err != nil {
103 return err
104 }
105 p.Err = KError(kerr)
106
107 return nil
108}