blob: 244a821368d7537090efcd4edd38a24bec2d877c [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
kesavandc71914f2022-03-25 11:19:03 +05303// ControlRecordType ...
kesavand2cde6582020-06-22 04:56:23 -04004type ControlRecordType int
5
6const (
kesavandc71914f2022-03-25 11:19:03 +05307 // ControlRecordAbort is a control record for abort
kesavand2cde6582020-06-22 04:56:23 -04008 ControlRecordAbort ControlRecordType = iota
kesavandc71914f2022-03-25 11:19:03 +05309 // ControlRecordCommit is a control record for commit
kesavand2cde6582020-06-22 04:56:23 -040010 ControlRecordCommit
kesavandc71914f2022-03-25 11:19:03 +053011 // ControlRecordUnknown is a control record of unknown type
kesavand2cde6582020-06-22 04:56:23 -040012 ControlRecordUnknown
13)
14
15// Control records are returned as a record by fetchRequest
16// However unlike "normal" records, they mean nothing application wise.
17// They only serve internal logic for supporting transactions.
18type ControlRecord struct {
19 Version int16
20 CoordinatorEpoch int32
21 Type ControlRecordType
22}
23
24func (cr *ControlRecord) decode(key, value packetDecoder) error {
25 var err error
kesavand2cde6582020-06-22 04:56:23 -040026 // There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
27 // Either way, all these version can only be 0 for now
28 cr.Version, err = key.getInt16()
29 if err != nil {
30 return err
31 }
32
33 recordType, err := key.getInt16()
34 if err != nil {
35 return err
36 }
37
38 switch recordType {
39 case 0:
40 cr.Type = ControlRecordAbort
41 case 1:
42 cr.Type = ControlRecordCommit
43 default:
44 // from JAVA implementation:
45 // UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
46 cr.Type = ControlRecordUnknown
47 }
kesavandc71914f2022-03-25 11:19:03 +053048 // we want to parse value only if we are decoding control record of known type
49 if cr.Type != ControlRecordUnknown {
50 cr.Version, err = value.getInt16()
51 if err != nil {
52 return err
53 }
54
55 cr.CoordinatorEpoch, err = value.getInt32()
56 if err != nil {
57 return err
58 }
59 }
kesavand2cde6582020-06-22 04:56:23 -040060 return nil
61}
62
63func (cr *ControlRecord) encode(key, value packetEncoder) {
64 value.putInt16(cr.Version)
65 value.putInt32(cr.CoordinatorEpoch)
66 key.putInt16(cr.Version)
67
68 switch cr.Type {
69 case ControlRecordAbort:
70 key.putInt16(0)
71 case ControlRecordCommit:
72 key.putInt16(1)
73 }
74}