blob: d899df53463d57e9b590c325e677b33bd2100960 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
7)
8
9type protocolBody interface {
10 encoder
11 versionedDecoder
12 key() int16
13 version() int16
khenaidoo7d3c5582021-08-11 18:09:44 -040014 headerVersion() int16
Holger Hildebrandtfa074992020-03-27 15:42:06 +000015 requiredVersion() KafkaVersion
16}
17
18type request struct {
19 correlationID int32
20 clientID string
21 body protocolBody
22}
23
24func (r *request) encode(pe packetEncoder) error {
25 pe.push(&lengthField{})
26 pe.putInt16(r.body.key())
27 pe.putInt16(r.body.version())
28 pe.putInt32(r.correlationID)
29
khenaidoo7d3c5582021-08-11 18:09:44 -040030 if r.body.headerVersion() >= 1 {
31 err := pe.putString(r.clientID)
32 if err != nil {
33 return err
34 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +000035 }
36
khenaidoo7d3c5582021-08-11 18:09:44 -040037 if r.body.headerVersion() >= 2 {
38 // we don't use tag headers at the moment so we just put an array length of 0
39 pe.putUVarint(0)
40 }
41
42 err := r.body.encode(pe)
Holger Hildebrandtfa074992020-03-27 15:42:06 +000043 if err != nil {
44 return err
45 }
46
47 return pe.pop()
48}
49
50func (r *request) decode(pd packetDecoder) (err error) {
51 key, err := pd.getInt16()
52 if err != nil {
53 return err
54 }
55
56 version, err := pd.getInt16()
57 if err != nil {
58 return err
59 }
60
61 r.correlationID, err = pd.getInt32()
62 if err != nil {
63 return err
64 }
65
66 r.clientID, err = pd.getString()
67 if err != nil {
68 return err
69 }
70
71 r.body = allocateBody(key, version)
72 if r.body == nil {
73 return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
74 }
75
khenaidoo7d3c5582021-08-11 18:09:44 -040076 if r.body.headerVersion() >= 2 {
77 // tagged field
78 _, err = pd.getUVarint()
79 if err != nil {
80 return err
81 }
82 }
83
Holger Hildebrandtfa074992020-03-27 15:42:06 +000084 return r.body.decode(pd, version)
85}
86
87func decodeRequest(r io.Reader) (*request, int, error) {
88 var (
89 bytesRead int
90 lengthBytes = make([]byte, 4)
91 )
92
93 if _, err := io.ReadFull(r, lengthBytes); err != nil {
94 return nil, bytesRead, err
95 }
96
97 bytesRead += len(lengthBytes)
98 length := int32(binary.BigEndian.Uint32(lengthBytes))
99
100 if length <= 4 || length > MaxRequestSize {
101 return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
102 }
103
104 encodedReq := make([]byte, length)
105 if _, err := io.ReadFull(r, encodedReq); err != nil {
106 return nil, bytesRead, err
107 }
108
109 bytesRead += len(encodedReq)
110
111 req := &request{}
112 if err := decode(encodedReq, req); err != nil {
113 return nil, bytesRead, err
114 }
115
116 return req, bytesRead, nil
117}
118
119func allocateBody(key, version int16) protocolBody {
120 switch key {
121 case 0:
122 return &ProduceRequest{}
123 case 1:
khenaidoo7d3c5582021-08-11 18:09:44 -0400124 return &FetchRequest{Version: version}
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000125 case 2:
126 return &OffsetRequest{Version: version}
127 case 3:
128 return &MetadataRequest{}
129 case 8:
130 return &OffsetCommitRequest{Version: version}
131 case 9:
khenaidoo7d3c5582021-08-11 18:09:44 -0400132 return &OffsetFetchRequest{Version: version}
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000133 case 10:
134 return &FindCoordinatorRequest{}
135 case 11:
136 return &JoinGroupRequest{}
137 case 12:
138 return &HeartbeatRequest{}
139 case 13:
140 return &LeaveGroupRequest{}
141 case 14:
142 return &SyncGroupRequest{}
143 case 15:
144 return &DescribeGroupsRequest{}
145 case 16:
146 return &ListGroupsRequest{}
147 case 17:
148 return &SaslHandshakeRequest{}
149 case 18:
150 return &ApiVersionsRequest{}
151 case 19:
152 return &CreateTopicsRequest{}
153 case 20:
154 return &DeleteTopicsRequest{}
155 case 21:
156 return &DeleteRecordsRequest{}
157 case 22:
158 return &InitProducerIDRequest{}
159 case 24:
160 return &AddPartitionsToTxnRequest{}
161 case 25:
162 return &AddOffsetsToTxnRequest{}
163 case 26:
164 return &EndTxnRequest{}
165 case 28:
166 return &TxnOffsetCommitRequest{}
167 case 29:
168 return &DescribeAclsRequest{}
169 case 30:
170 return &CreateAclsRequest{}
171 case 31:
172 return &DeleteAclsRequest{}
173 case 32:
174 return &DescribeConfigsRequest{}
175 case 33:
176 return &AlterConfigsRequest{}
khenaidoo7d3c5582021-08-11 18:09:44 -0400177 case 35:
178 return &DescribeLogDirsRequest{}
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000179 case 36:
180 return &SaslAuthenticateRequest{}
181 case 37:
182 return &CreatePartitionsRequest{}
183 case 42:
184 return &DeleteGroupsRequest{}
khenaidoo7d3c5582021-08-11 18:09:44 -0400185 case 44:
186 return &IncrementalAlterConfigsRequest{}
187 case 45:
188 return &AlterPartitionReassignmentsRequest{}
189 case 46:
190 return &ListPartitionReassignmentsRequest{}
191 case 50:
192 return &DescribeUserScramCredentialsRequest{}
193 case 51:
194 return &AlterUserScramCredentialsRequest{}
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000195 }
196 return nil
197}