blob: 97437d67bd496c70f103b0a513b181557cd446bb [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
7)
8
9type protocolBody interface {
10 encoder
11 versionedDecoder
12 key() int16
13 version() int16
14 requiredVersion() KafkaVersion
15}
16
17type request struct {
18 correlationID int32
19 clientID string
20 body protocolBody
21}
22
23func (r *request) encode(pe packetEncoder) error {
24 pe.push(&lengthField{})
25 pe.putInt16(r.body.key())
26 pe.putInt16(r.body.version())
27 pe.putInt32(r.correlationID)
28
29 err := pe.putString(r.clientID)
30 if err != nil {
31 return err
32 }
33
34 err = r.body.encode(pe)
35 if err != nil {
36 return err
37 }
38
39 return pe.pop()
40}
41
42func (r *request) decode(pd packetDecoder) (err error) {
43 key, err := pd.getInt16()
44 if err != nil {
45 return err
46 }
47
48 version, err := pd.getInt16()
49 if err != nil {
50 return err
51 }
52
53 r.correlationID, err = pd.getInt32()
54 if err != nil {
55 return err
56 }
57
58 r.clientID, err = pd.getString()
59 if err != nil {
60 return err
61 }
62
63 r.body = allocateBody(key, version)
64 if r.body == nil {
65 return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
66 }
67
68 return r.body.decode(pd, version)
69}
70
71func decodeRequest(r io.Reader) (*request, int, error) {
72 var (
73 bytesRead int
74 lengthBytes = make([]byte, 4)
75 )
76
77 if _, err := io.ReadFull(r, lengthBytes); err != nil {
78 return nil, bytesRead, err
79 }
80
81 bytesRead += len(lengthBytes)
82 length := int32(binary.BigEndian.Uint32(lengthBytes))
83
84 if length <= 4 || length > MaxRequestSize {
85 return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
86 }
87
88 encodedReq := make([]byte, length)
89 if _, err := io.ReadFull(r, encodedReq); err != nil {
90 return nil, bytesRead, err
91 }
92
93 bytesRead += len(encodedReq)
94
95 req := &request{}
96 if err := decode(encodedReq, req); err != nil {
97 return nil, bytesRead, err
98 }
99
100 return req, bytesRead, nil
101}
102
103func allocateBody(key, version int16) protocolBody {
104 switch key {
105 case 0:
106 return &ProduceRequest{}
107 case 1:
108 return &FetchRequest{}
109 case 2:
110 return &OffsetRequest{Version: version}
111 case 3:
112 return &MetadataRequest{}
113 case 8:
114 return &OffsetCommitRequest{Version: version}
115 case 9:
116 return &OffsetFetchRequest{}
117 case 10:
118 return &FindCoordinatorRequest{}
119 case 11:
120 return &JoinGroupRequest{}
121 case 12:
122 return &HeartbeatRequest{}
123 case 13:
124 return &LeaveGroupRequest{}
125 case 14:
126 return &SyncGroupRequest{}
127 case 15:
128 return &DescribeGroupsRequest{}
129 case 16:
130 return &ListGroupsRequest{}
131 case 17:
132 return &SaslHandshakeRequest{}
133 case 18:
134 return &ApiVersionsRequest{}
135 case 19:
136 return &CreateTopicsRequest{}
137 case 20:
138 return &DeleteTopicsRequest{}
139 case 21:
140 return &DeleteRecordsRequest{}
141 case 22:
142 return &InitProducerIDRequest{}
143 case 24:
144 return &AddPartitionsToTxnRequest{}
145 case 25:
146 return &AddOffsetsToTxnRequest{}
147 case 26:
148 return &EndTxnRequest{}
149 case 28:
150 return &TxnOffsetCommitRequest{}
151 case 29:
152 return &DescribeAclsRequest{}
153 case 30:
154 return &CreateAclsRequest{}
155 case 31:
156 return &DeleteAclsRequest{}
157 case 32:
158 return &DescribeConfigsRequest{}
159 case 33:
160 return &AlterConfigsRequest{}
161 case 35:
162 return &DescribeLogDirsRequest{}
163 case 36:
164 return &SaslAuthenticateRequest{}
165 case 37:
166 return &CreatePartitionsRequest{}
167 case 42:
168 return &DeleteGroupsRequest{}
169 }
170 return nil
171}