blob: 244a821368d7537090efcd4edd38a24bec2d877c [file] [log] [blame]
Scott Baker8461e152019-10-01 14:44:30 -07001package sarama
2
khenaidood948f772021-08-11 17:49:24 -04003// ControlRecordType ...
Scott Baker8461e152019-10-01 14:44:30 -07004type ControlRecordType int
5
6const (
khenaidood948f772021-08-11 17:49:24 -04007 // ControlRecordAbort is a control record for abort
Scott Baker8461e152019-10-01 14:44:30 -07008 ControlRecordAbort ControlRecordType = iota
khenaidood948f772021-08-11 17:49:24 -04009 // ControlRecordCommit is a control record for commit
Scott Baker8461e152019-10-01 14:44:30 -070010 ControlRecordCommit
khenaidood948f772021-08-11 17:49:24 -040011 // ControlRecordUnknown is a control record of unknown type
Scott Baker8461e152019-10-01 14:44:30 -070012 ControlRecordUnknown
13)
14
15// Control records are returned as a record by fetchRequest
16// However unlike "normal" records, they mean nothing application wise.
17// They only serve internal logic for supporting transactions.
18type ControlRecord struct {
19 Version int16
20 CoordinatorEpoch int32
21 Type ControlRecordType
22}
23
24func (cr *ControlRecord) decode(key, value packetDecoder) error {
25 var err error
Scott Baker8461e152019-10-01 14:44:30 -070026 // There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
27 // Either way, all these version can only be 0 for now
28 cr.Version, err = key.getInt16()
29 if err != nil {
30 return err
31 }
32
33 recordType, err := key.getInt16()
34 if err != nil {
35 return err
36 }
37
38 switch recordType {
39 case 0:
40 cr.Type = ControlRecordAbort
41 case 1:
42 cr.Type = ControlRecordCommit
43 default:
44 // from JAVA implementation:
45 // UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
46 cr.Type = ControlRecordUnknown
47 }
khenaidood948f772021-08-11 17:49:24 -040048 // we want to parse value only if we are decoding control record of known type
49 if cr.Type != ControlRecordUnknown {
50 cr.Version, err = value.getInt16()
51 if err != nil {
52 return err
53 }
54
55 cr.CoordinatorEpoch, err = value.getInt32()
56 if err != nil {
57 return err
58 }
59 }
Scott Baker8461e152019-10-01 14:44:30 -070060 return nil
61}
62
63func (cr *ControlRecord) encode(key, value packetEncoder) {
64 value.putInt16(cr.Version)
65 value.putInt32(cr.CoordinatorEpoch)
66 key.putInt16(cr.Version)
67
68 switch cr.Type {
69 case ControlRecordAbort:
70 key.putInt16(0)
71 case ControlRecordCommit:
72 key.putInt16(1)
73 }
74}