blob: 8b2193f9a0bf2714ff7d059e4f4e12f131659c0d [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3type OffsetResponseBlock struct {
4 Err KError
5 Offsets []int64 // Version 0
6 Offset int64 // Version 1
7 Timestamp int64 // Version 1
8}
9
10func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) {
11 tmp, err := pd.getInt16()
12 if err != nil {
13 return err
14 }
15 b.Err = KError(tmp)
16
17 if version == 0 {
18 b.Offsets, err = pd.getInt64Array()
19
20 return err
21 }
22
23 b.Timestamp, err = pd.getInt64()
24 if err != nil {
25 return err
26 }
27
28 b.Offset, err = pd.getInt64()
29 if err != nil {
30 return err
31 }
32
33 // For backwards compatibility put the offset in the offsets array too
34 b.Offsets = []int64{b.Offset}
35
36 return nil
37}
38
39func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error) {
40 pe.putInt16(int16(b.Err))
41
42 if version == 0 {
43 return pe.putInt64Array(b.Offsets)
44 }
45
46 pe.putInt64(b.Timestamp)
47 pe.putInt64(b.Offset)
48
49 return nil
50}
51
52type OffsetResponse struct {
53 Version int16
54 Blocks map[string]map[int32]*OffsetResponseBlock
55}
56
57func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
58 numTopics, err := pd.getArrayLength()
59 if err != nil {
60 return err
61 }
62
63 r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
64 for i := 0; i < numTopics; i++ {
65 name, err := pd.getString()
66 if err != nil {
67 return err
68 }
69
70 numBlocks, err := pd.getArrayLength()
71 if err != nil {
72 return err
73 }
74
75 r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
76
77 for j := 0; j < numBlocks; j++ {
78 id, err := pd.getInt32()
79 if err != nil {
80 return err
81 }
82
83 block := new(OffsetResponseBlock)
84 err = block.decode(pd, version)
85 if err != nil {
86 return err
87 }
88 r.Blocks[name][id] = block
89 }
90 }
91
92 return nil
93}
94
95func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock {
96 if r.Blocks == nil {
97 return nil
98 }
99
100 if r.Blocks[topic] == nil {
101 return nil
102 }
103
104 return r.Blocks[topic][partition]
105}
106
107/*
108// [0 0 0 1 ntopics
1090 8 109 121 95 116 111 112 105 99 topic
1100 0 0 1 npartitions
1110 0 0 0 id
1120 0
113
1140 0 0 1 0 0 0 0
1150 1 1 1 0 0 0 1
1160 8 109 121 95 116 111 112
117105 99 0 0 0 1 0 0
1180 0 0 0 0 0 0 1
1190 0 0 0 0 1 1 1] <nil>
120
121*/
122func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
123 if err = pe.putArrayLength(len(r.Blocks)); err != nil {
124 return err
125 }
126
127 for topic, partitions := range r.Blocks {
128 if err = pe.putString(topic); err != nil {
129 return err
130 }
131 if err = pe.putArrayLength(len(partitions)); err != nil {
132 return err
133 }
134 for partition, block := range partitions {
135 pe.putInt32(partition)
136 if err = block.encode(pe, r.version()); err != nil {
137 return err
138 }
139 }
140 }
141
142 return nil
143}
144
145func (r *OffsetResponse) key() int16 {
146 return 2
147}
148
149func (r *OffsetResponse) version() int16 {
150 return r.Version
151}
152
153func (r *OffsetResponse) requiredVersion() KafkaVersion {
154 switch r.Version {
155 case 1:
156 return V0_10_1_0
157 default:
158 return MinVersion
159 }
160}
161
162// testing API
163
164func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {
165 if r.Blocks == nil {
166 r.Blocks = make(map[string]map[int32]*OffsetResponseBlock)
167 }
168 byTopic, ok := r.Blocks[topic]
169 if !ok {
170 byTopic = make(map[int32]*OffsetResponseBlock)
171 r.Blocks[topic] = byTopic
172 }
173 byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}, Offset: offset}
174}