blob: 9b75ab53b3c74cbfb1a9388f455daa17d7c9d894 [file] [log] [blame]
Scott Baker8461e152019-10-01 14:44:30 -07001package sarama
2
3//ControlRecordType ...
4type ControlRecordType int
5
6const (
7 //ControlRecordAbort is a control record for abort
8 ControlRecordAbort ControlRecordType = iota
9 //ControlRecordCommit is a control record for commit
10 ControlRecordCommit
11 //ControlRecordUnknown is a control record of unknown type
12 ControlRecordUnknown
13)
14
15// Control records are returned as a record by fetchRequest
16// However unlike "normal" records, they mean nothing application wise.
17// They only serve internal logic for supporting transactions.
18type ControlRecord struct {
19 Version int16
20 CoordinatorEpoch int32
21 Type ControlRecordType
22}
23
24func (cr *ControlRecord) decode(key, value packetDecoder) error {
25 var err error
26 cr.Version, err = value.getInt16()
27 if err != nil {
28 return err
29 }
30
31 cr.CoordinatorEpoch, err = value.getInt32()
32 if err != nil {
33 return err
34 }
35
36 // There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
37 // Either way, all these version can only be 0 for now
38 cr.Version, err = key.getInt16()
39 if err != nil {
40 return err
41 }
42
43 recordType, err := key.getInt16()
44 if err != nil {
45 return err
46 }
47
48 switch recordType {
49 case 0:
50 cr.Type = ControlRecordAbort
51 case 1:
52 cr.Type = ControlRecordCommit
53 default:
54 // from JAVA implementation:
55 // UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
56 cr.Type = ControlRecordUnknown
57 }
58 return nil
59}
60
61func (cr *ControlRecord) encode(key, value packetEncoder) {
62 value.putInt16(cr.Version)
63 value.putInt32(cr.CoordinatorEpoch)
64 key.putInt16(cr.Version)
65
66 switch cr.Type {
67 case ControlRecordAbort:
68 key.putInt16(0)
69 case ControlRecordCommit:
70 key.putInt16(1)
71 }
72}