blob: 339c7857cac863a678c97662b79bc4302acb9656 [file] [log] [blame]
kesavandc71914f2022-03-25 11:19:03 +05301package sarama
2
3type DeleteOffsetsRequest struct {
4 Group string
5 partitions map[string][]int32
6}
7
8func (r *DeleteOffsetsRequest) encode(pe packetEncoder) (err error) {
9 err = pe.putString(r.Group)
10 if err != nil {
11 return err
12 }
13
14 if r.partitions == nil {
15 pe.putInt32(0)
16 } else {
17 if err = pe.putArrayLength(len(r.partitions)); err != nil {
18 return err
19 }
20 }
21 for topic, partitions := range r.partitions {
22 err = pe.putString(topic)
23 if err != nil {
24 return err
25 }
26 err = pe.putInt32Array(partitions)
27 if err != nil {
28 return err
29 }
30 }
31 return
32}
33
34func (r *DeleteOffsetsRequest) decode(pd packetDecoder, version int16) (err error) {
35 r.Group, err = pd.getString()
36 if err != nil {
37 return err
38 }
39 var partitionCount int
40
41 partitionCount, err = pd.getArrayLength()
42 if err != nil {
43 return err
44 }
45
46 if (partitionCount == 0 && version < 2) || partitionCount < 0 {
47 return nil
48 }
49
50 r.partitions = make(map[string][]int32, partitionCount)
51 for i := 0; i < partitionCount; i++ {
52 var topic string
53 topic, err = pd.getString()
54 if err != nil {
55 return err
56 }
57
58 var partitions []int32
59 partitions, err = pd.getInt32Array()
60 if err != nil {
61 return err
62 }
63
64 r.partitions[topic] = partitions
65 }
66
67 return nil
68}
69
70func (r *DeleteOffsetsRequest) key() int16 {
71 return 47
72}
73
74func (r *DeleteOffsetsRequest) version() int16 {
75 return 0
76}
77
78func (r *DeleteOffsetsRequest) headerVersion() int16 {
79 return 1
80}
81
82func (r *DeleteOffsetsRequest) requiredVersion() KafkaVersion {
83 return V2_4_0_0
84}
85
86func (r *DeleteOffsetsRequest) AddPartition(topic string, partitionID int32) {
87 if r.partitions == nil {
88 r.partitions = make(map[string][]int32)
89 }
90
91 r.partitions[topic] = append(r.partitions[topic], partitionID)
92}