blob: dab54f88cc78642ba484434e30f522c42bc6074d [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "fmt"
5
6 "github.com/rcrowley/go-metrics"
7)
8
9// Encoder is the interface that wraps the basic Encode method.
10// Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
11type encoder interface {
12 encode(pe packetEncoder) error
13}
14
khenaidood948f772021-08-11 17:49:24 -040015type encoderWithHeader interface {
16 encoder
17 headerVersion() int16
18}
19
khenaidooac637102019-01-14 15:44:34 -050020// Encode takes an Encoder and turns it into bytes while potentially recording metrics.
21func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
22 if e == nil {
23 return nil, nil
24 }
25
26 var prepEnc prepEncoder
27 var realEnc realEncoder
28
29 err := e.encode(&prepEnc)
30 if err != nil {
31 return nil, err
32 }
33
34 if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
35 return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
36 }
37
38 realEnc.raw = make([]byte, prepEnc.length)
39 realEnc.registry = metricRegistry
40 err = e.encode(&realEnc)
41 if err != nil {
42 return nil, err
43 }
44
45 return realEnc.raw, nil
46}
47
khenaidood948f772021-08-11 17:49:24 -040048// decoder is the interface that wraps the basic Decode method.
khenaidooac637102019-01-14 15:44:34 -050049// Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
50type decoder interface {
51 decode(pd packetDecoder) error
52}
53
54type versionedDecoder interface {
55 decode(pd packetDecoder, version int16) error
56}
57
khenaidood948f772021-08-11 17:49:24 -040058// decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
khenaidooac637102019-01-14 15:44:34 -050059// interpreted using Kafka's encoding rules.
60func decode(buf []byte, in decoder) error {
61 if buf == nil {
62 return nil
63 }
64
65 helper := realDecoder{raw: buf}
66 err := in.decode(&helper)
67 if err != nil {
68 return err
69 }
70
71 if helper.off != len(buf) {
72 return PacketDecodingError{"invalid length"}
73 }
74
75 return nil
76}
77
78func versionedDecode(buf []byte, in versionedDecoder, version int16) error {
79 if buf == nil {
80 return nil
81 }
82
83 helper := realDecoder{raw: buf}
84 err := in.decode(&helper, version)
85 if err != nil {
86 return err
87 }
88
89 if helper.off != len(buf) {
90 return PacketDecodingError{"invalid length"}
91 }
92
93 return nil
94}