blob: f7eea59480edb927b71d749684e3568a0b34ae05 [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
7)
8
9type protocolBody interface {
10 encoder
11 versionedDecoder
12 key() int16
13 version() int16
14 requiredVersion() KafkaVersion
15}
16
17type request struct {
18 correlationID int32
19 clientID string
20 body protocolBody
21}
22
23func (r *request) encode(pe packetEncoder) (err error) {
24 pe.push(&lengthField{})
25 pe.putInt16(r.body.key())
26 pe.putInt16(r.body.version())
27 pe.putInt32(r.correlationID)
28 err = pe.putString(r.clientID)
29 if err != nil {
30 return err
31 }
32 err = r.body.encode(pe)
33 if err != nil {
34 return err
35 }
36 return pe.pop()
37}
38
39func (r *request) decode(pd packetDecoder) (err error) {
40 var key int16
41 if key, err = pd.getInt16(); err != nil {
42 return err
43 }
44 var version int16
45 if version, err = pd.getInt16(); err != nil {
46 return err
47 }
48 if r.correlationID, err = pd.getInt32(); err != nil {
49 return err
50 }
51 r.clientID, err = pd.getString()
52
53 r.body = allocateBody(key, version)
54 if r.body == nil {
55 return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
56 }
57 return r.body.decode(pd, version)
58}
59
60func decodeRequest(r io.Reader) (req *request, bytesRead int, err error) {
61 lengthBytes := make([]byte, 4)
62 if _, err := io.ReadFull(r, lengthBytes); err != nil {
63 return nil, bytesRead, err
64 }
65 bytesRead += len(lengthBytes)
66
67 length := int32(binary.BigEndian.Uint32(lengthBytes))
68 if length <= 4 || length > MaxRequestSize {
69 return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
70 }
71
72 encodedReq := make([]byte, length)
73 if _, err := io.ReadFull(r, encodedReq); err != nil {
74 return nil, bytesRead, err
75 }
76 bytesRead += len(encodedReq)
77
78 req = &request{}
79 if err := decode(encodedReq, req); err != nil {
80 return nil, bytesRead, err
81 }
82 return req, bytesRead, nil
83}
84
85func allocateBody(key, version int16) protocolBody {
86 switch key {
87 case 0:
88 return &ProduceRequest{}
89 case 1:
90 return &FetchRequest{}
91 case 2:
92 return &OffsetRequest{Version: version}
93 case 3:
94 return &MetadataRequest{}
95 case 8:
96 return &OffsetCommitRequest{Version: version}
97 case 9:
98 return &OffsetFetchRequest{}
99 case 10:
100 return &FindCoordinatorRequest{}
101 case 11:
102 return &JoinGroupRequest{}
103 case 12:
104 return &HeartbeatRequest{}
105 case 13:
106 return &LeaveGroupRequest{}
107 case 14:
108 return &SyncGroupRequest{}
109 case 15:
110 return &DescribeGroupsRequest{}
111 case 16:
112 return &ListGroupsRequest{}
113 case 17:
114 return &SaslHandshakeRequest{}
115 case 18:
116 return &ApiVersionsRequest{}
117 case 19:
118 return &CreateTopicsRequest{}
119 case 20:
120 return &DeleteTopicsRequest{}
121 case 21:
122 return &DeleteRecordsRequest{}
123 case 22:
124 return &InitProducerIDRequest{}
125 case 24:
126 return &AddPartitionsToTxnRequest{}
127 case 25:
128 return &AddOffsetsToTxnRequest{}
129 case 26:
130 return &EndTxnRequest{}
131 case 28:
132 return &TxnOffsetCommitRequest{}
133 case 29:
134 return &DescribeAclsRequest{}
135 case 30:
136 return &CreateAclsRequest{}
137 case 31:
138 return &DeleteAclsRequest{}
139 case 32:
140 return &DescribeConfigsRequest{}
141 case 33:
142 return &AlterConfigsRequest{}
143 case 36:
144 return &SaslAuthenticateRequest{}
145 case 37:
146 return &CreatePartitionsRequest{}
147 case 42:
148 return &DeleteGroupsRequest{}
149 }
150 return nil
151}