blob: d899df53463d57e9b590c325e677b33bd2100960 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
7)
8
9type protocolBody interface {
10 encoder
11 versionedDecoder
12 key() int16
13 version() int16
khenaidood948f772021-08-11 17:49:24 -040014 headerVersion() int16
khenaidooac637102019-01-14 15:44:34 -050015 requiredVersion() KafkaVersion
16}
17
18type request struct {
19 correlationID int32
20 clientID string
21 body protocolBody
22}
23
Scott Baker8461e152019-10-01 14:44:30 -070024func (r *request) encode(pe packetEncoder) error {
khenaidooac637102019-01-14 15:44:34 -050025 pe.push(&lengthField{})
26 pe.putInt16(r.body.key())
27 pe.putInt16(r.body.version())
28 pe.putInt32(r.correlationID)
Scott Baker8461e152019-10-01 14:44:30 -070029
khenaidood948f772021-08-11 17:49:24 -040030 if r.body.headerVersion() >= 1 {
31 err := pe.putString(r.clientID)
32 if err != nil {
33 return err
34 }
khenaidooac637102019-01-14 15:44:34 -050035 }
Scott Baker8461e152019-10-01 14:44:30 -070036
khenaidood948f772021-08-11 17:49:24 -040037 if r.body.headerVersion() >= 2 {
38 // we don't use tag headers at the moment so we just put an array length of 0
39 pe.putUVarint(0)
40 }
41
42 err := r.body.encode(pe)
khenaidooac637102019-01-14 15:44:34 -050043 if err != nil {
44 return err
45 }
Scott Baker8461e152019-10-01 14:44:30 -070046
khenaidooac637102019-01-14 15:44:34 -050047 return pe.pop()
48}
49
50func (r *request) decode(pd packetDecoder) (err error) {
Scott Baker8461e152019-10-01 14:44:30 -070051 key, err := pd.getInt16()
52 if err != nil {
khenaidooac637102019-01-14 15:44:34 -050053 return err
54 }
Scott Baker8461e152019-10-01 14:44:30 -070055
56 version, err := pd.getInt16()
57 if err != nil {
khenaidooac637102019-01-14 15:44:34 -050058 return err
59 }
Scott Baker8461e152019-10-01 14:44:30 -070060
61 r.correlationID, err = pd.getInt32()
62 if err != nil {
khenaidooac637102019-01-14 15:44:34 -050063 return err
64 }
Scott Baker8461e152019-10-01 14:44:30 -070065
khenaidooac637102019-01-14 15:44:34 -050066 r.clientID, err = pd.getString()
Scott Baker8461e152019-10-01 14:44:30 -070067 if err != nil {
68 return err
69 }
khenaidooac637102019-01-14 15:44:34 -050070
71 r.body = allocateBody(key, version)
72 if r.body == nil {
73 return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
74 }
Scott Baker8461e152019-10-01 14:44:30 -070075
khenaidood948f772021-08-11 17:49:24 -040076 if r.body.headerVersion() >= 2 {
77 // tagged field
78 _, err = pd.getUVarint()
79 if err != nil {
80 return err
81 }
82 }
83
khenaidooac637102019-01-14 15:44:34 -050084 return r.body.decode(pd, version)
85}
86
Scott Baker8461e152019-10-01 14:44:30 -070087func decodeRequest(r io.Reader) (*request, int, error) {
88 var (
89 bytesRead int
90 lengthBytes = make([]byte, 4)
91 )
92
khenaidooac637102019-01-14 15:44:34 -050093 if _, err := io.ReadFull(r, lengthBytes); err != nil {
94 return nil, bytesRead, err
95 }
khenaidooac637102019-01-14 15:44:34 -050096
Scott Baker8461e152019-10-01 14:44:30 -070097 bytesRead += len(lengthBytes)
khenaidooac637102019-01-14 15:44:34 -050098 length := int32(binary.BigEndian.Uint32(lengthBytes))
Scott Baker8461e152019-10-01 14:44:30 -070099
khenaidooac637102019-01-14 15:44:34 -0500100 if length <= 4 || length > MaxRequestSize {
101 return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
102 }
103
104 encodedReq := make([]byte, length)
105 if _, err := io.ReadFull(r, encodedReq); err != nil {
106 return nil, bytesRead, err
107 }
Scott Baker8461e152019-10-01 14:44:30 -0700108
khenaidooac637102019-01-14 15:44:34 -0500109 bytesRead += len(encodedReq)
110
Scott Baker8461e152019-10-01 14:44:30 -0700111 req := &request{}
khenaidooac637102019-01-14 15:44:34 -0500112 if err := decode(encodedReq, req); err != nil {
113 return nil, bytesRead, err
114 }
Scott Baker8461e152019-10-01 14:44:30 -0700115
khenaidooac637102019-01-14 15:44:34 -0500116 return req, bytesRead, nil
117}
118
119func allocateBody(key, version int16) protocolBody {
120 switch key {
121 case 0:
122 return &ProduceRequest{}
123 case 1:
khenaidood948f772021-08-11 17:49:24 -0400124 return &FetchRequest{Version: version}
khenaidooac637102019-01-14 15:44:34 -0500125 case 2:
126 return &OffsetRequest{Version: version}
127 case 3:
128 return &MetadataRequest{}
129 case 8:
130 return &OffsetCommitRequest{Version: version}
131 case 9:
khenaidood948f772021-08-11 17:49:24 -0400132 return &OffsetFetchRequest{Version: version}
khenaidooac637102019-01-14 15:44:34 -0500133 case 10:
134 return &FindCoordinatorRequest{}
135 case 11:
136 return &JoinGroupRequest{}
137 case 12:
138 return &HeartbeatRequest{}
139 case 13:
140 return &LeaveGroupRequest{}
141 case 14:
142 return &SyncGroupRequest{}
143 case 15:
144 return &DescribeGroupsRequest{}
145 case 16:
146 return &ListGroupsRequest{}
147 case 17:
148 return &SaslHandshakeRequest{}
149 case 18:
150 return &ApiVersionsRequest{}
151 case 19:
152 return &CreateTopicsRequest{}
153 case 20:
154 return &DeleteTopicsRequest{}
155 case 21:
156 return &DeleteRecordsRequest{}
157 case 22:
158 return &InitProducerIDRequest{}
159 case 24:
160 return &AddPartitionsToTxnRequest{}
161 case 25:
162 return &AddOffsetsToTxnRequest{}
163 case 26:
164 return &EndTxnRequest{}
165 case 28:
166 return &TxnOffsetCommitRequest{}
167 case 29:
168 return &DescribeAclsRequest{}
169 case 30:
170 return &CreateAclsRequest{}
171 case 31:
172 return &DeleteAclsRequest{}
173 case 32:
174 return &DescribeConfigsRequest{}
175 case 33:
176 return &AlterConfigsRequest{}
khenaidood948f772021-08-11 17:49:24 -0400177 case 35:
178 return &DescribeLogDirsRequest{}
William Kurkiandaa6bb22019-03-07 12:26:28 -0500179 case 36:
180 return &SaslAuthenticateRequest{}
khenaidooac637102019-01-14 15:44:34 -0500181 case 37:
182 return &CreatePartitionsRequest{}
183 case 42:
184 return &DeleteGroupsRequest{}
khenaidood948f772021-08-11 17:49:24 -0400185 case 44:
186 return &IncrementalAlterConfigsRequest{}
187 case 45:
188 return &AlterPartitionReassignmentsRequest{}
189 case 46:
190 return &ListPartitionReassignmentsRequest{}
191 case 50:
192 return &DescribeUserScramCredentialsRequest{}
193 case 51:
194 return &AlterUserScramCredentialsRequest{}
khenaidooac637102019-01-14 15:44:34 -0500195 }
196 return nil
197}