blob: d899df53463d57e9b590c325e677b33bd2100960 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
7)
8
9type protocolBody interface {
10 encoder
11 versionedDecoder
12 key() int16
13 version() int16
khenaidoo106c61a2021-08-11 18:05:46 -040014 headerVersion() int16
William Kurkianea869482019-04-09 15:16:11 -040015 requiredVersion() KafkaVersion
16}
17
18type request struct {
19 correlationID int32
20 clientID string
21 body protocolBody
22}
23
Abhilash S.L3b494632019-07-16 15:51:09 +053024func (r *request) encode(pe packetEncoder) error {
William Kurkianea869482019-04-09 15:16:11 -040025 pe.push(&lengthField{})
26 pe.putInt16(r.body.key())
27 pe.putInt16(r.body.version())
28 pe.putInt32(r.correlationID)
Abhilash S.L3b494632019-07-16 15:51:09 +053029
khenaidoo106c61a2021-08-11 18:05:46 -040030 if r.body.headerVersion() >= 1 {
31 err := pe.putString(r.clientID)
32 if err != nil {
33 return err
34 }
William Kurkianea869482019-04-09 15:16:11 -040035 }
Abhilash S.L3b494632019-07-16 15:51:09 +053036
khenaidoo106c61a2021-08-11 18:05:46 -040037 if r.body.headerVersion() >= 2 {
38 // we don't use tag headers at the moment so we just put an array length of 0
39 pe.putUVarint(0)
40 }
41
42 err := r.body.encode(pe)
William Kurkianea869482019-04-09 15:16:11 -040043 if err != nil {
44 return err
45 }
Abhilash S.L3b494632019-07-16 15:51:09 +053046
William Kurkianea869482019-04-09 15:16:11 -040047 return pe.pop()
48}
49
50func (r *request) decode(pd packetDecoder) (err error) {
Abhilash S.L3b494632019-07-16 15:51:09 +053051 key, err := pd.getInt16()
52 if err != nil {
William Kurkianea869482019-04-09 15:16:11 -040053 return err
54 }
Abhilash S.L3b494632019-07-16 15:51:09 +053055
56 version, err := pd.getInt16()
57 if err != nil {
William Kurkianea869482019-04-09 15:16:11 -040058 return err
59 }
Abhilash S.L3b494632019-07-16 15:51:09 +053060
61 r.correlationID, err = pd.getInt32()
62 if err != nil {
William Kurkianea869482019-04-09 15:16:11 -040063 return err
64 }
Abhilash S.L3b494632019-07-16 15:51:09 +053065
William Kurkianea869482019-04-09 15:16:11 -040066 r.clientID, err = pd.getString()
Abhilash S.L3b494632019-07-16 15:51:09 +053067 if err != nil {
68 return err
69 }
William Kurkianea869482019-04-09 15:16:11 -040070
71 r.body = allocateBody(key, version)
72 if r.body == nil {
73 return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
74 }
Abhilash S.L3b494632019-07-16 15:51:09 +053075
khenaidoo106c61a2021-08-11 18:05:46 -040076 if r.body.headerVersion() >= 2 {
77 // tagged field
78 _, err = pd.getUVarint()
79 if err != nil {
80 return err
81 }
82 }
83
William Kurkianea869482019-04-09 15:16:11 -040084 return r.body.decode(pd, version)
85}
86
Abhilash S.L3b494632019-07-16 15:51:09 +053087func decodeRequest(r io.Reader) (*request, int, error) {
88 var (
89 bytesRead int
90 lengthBytes = make([]byte, 4)
91 )
92
William Kurkianea869482019-04-09 15:16:11 -040093 if _, err := io.ReadFull(r, lengthBytes); err != nil {
94 return nil, bytesRead, err
95 }
William Kurkianea869482019-04-09 15:16:11 -040096
Abhilash S.L3b494632019-07-16 15:51:09 +053097 bytesRead += len(lengthBytes)
William Kurkianea869482019-04-09 15:16:11 -040098 length := int32(binary.BigEndian.Uint32(lengthBytes))
Abhilash S.L3b494632019-07-16 15:51:09 +053099
William Kurkianea869482019-04-09 15:16:11 -0400100 if length <= 4 || length > MaxRequestSize {
101 return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
102 }
103
104 encodedReq := make([]byte, length)
105 if _, err := io.ReadFull(r, encodedReq); err != nil {
106 return nil, bytesRead, err
107 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530108
William Kurkianea869482019-04-09 15:16:11 -0400109 bytesRead += len(encodedReq)
110
Abhilash S.L3b494632019-07-16 15:51:09 +0530111 req := &request{}
William Kurkianea869482019-04-09 15:16:11 -0400112 if err := decode(encodedReq, req); err != nil {
113 return nil, bytesRead, err
114 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530115
William Kurkianea869482019-04-09 15:16:11 -0400116 return req, bytesRead, nil
117}
118
119func allocateBody(key, version int16) protocolBody {
120 switch key {
121 case 0:
122 return &ProduceRequest{}
123 case 1:
khenaidoo106c61a2021-08-11 18:05:46 -0400124 return &FetchRequest{Version: version}
William Kurkianea869482019-04-09 15:16:11 -0400125 case 2:
126 return &OffsetRequest{Version: version}
127 case 3:
128 return &MetadataRequest{}
129 case 8:
130 return &OffsetCommitRequest{Version: version}
131 case 9:
khenaidoo106c61a2021-08-11 18:05:46 -0400132 return &OffsetFetchRequest{Version: version}
William Kurkianea869482019-04-09 15:16:11 -0400133 case 10:
134 return &FindCoordinatorRequest{}
135 case 11:
136 return &JoinGroupRequest{}
137 case 12:
138 return &HeartbeatRequest{}
139 case 13:
140 return &LeaveGroupRequest{}
141 case 14:
142 return &SyncGroupRequest{}
143 case 15:
144 return &DescribeGroupsRequest{}
145 case 16:
146 return &ListGroupsRequest{}
147 case 17:
148 return &SaslHandshakeRequest{}
149 case 18:
150 return &ApiVersionsRequest{}
151 case 19:
152 return &CreateTopicsRequest{}
153 case 20:
154 return &DeleteTopicsRequest{}
155 case 21:
156 return &DeleteRecordsRequest{}
157 case 22:
158 return &InitProducerIDRequest{}
159 case 24:
160 return &AddPartitionsToTxnRequest{}
161 case 25:
162 return &AddOffsetsToTxnRequest{}
163 case 26:
164 return &EndTxnRequest{}
165 case 28:
166 return &TxnOffsetCommitRequest{}
167 case 29:
168 return &DescribeAclsRequest{}
169 case 30:
170 return &CreateAclsRequest{}
171 case 31:
172 return &DeleteAclsRequest{}
173 case 32:
174 return &DescribeConfigsRequest{}
175 case 33:
176 return &AlterConfigsRequest{}
khenaidoo106c61a2021-08-11 18:05:46 -0400177 case 35:
178 return &DescribeLogDirsRequest{}
William Kurkianea869482019-04-09 15:16:11 -0400179 case 36:
180 return &SaslAuthenticateRequest{}
181 case 37:
182 return &CreatePartitionsRequest{}
183 case 42:
184 return &DeleteGroupsRequest{}
khenaidoo106c61a2021-08-11 18:05:46 -0400185 case 44:
186 return &IncrementalAlterConfigsRequest{}
187 case 45:
188 return &AlterPartitionReassignmentsRequest{}
189 case 46:
190 return &ListPartitionReassignmentsRequest{}
191 case 50:
192 return &DescribeUserScramCredentialsRequest{}
193 case 51:
194 return &AlterUserScramCredentialsRequest{}
William Kurkianea869482019-04-09 15:16:11 -0400195 }
196 return nil
197}