blob: 5ed8ca4dd805e3b424eff75399ca81b67268d982 [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
7)
8
9type protocolBody interface {
10 encoder
11 versionedDecoder
12 key() int16
13 version() int16
14 requiredVersion() KafkaVersion
15}
16
17type request struct {
18 correlationID int32
19 clientID string
20 body protocolBody
21}
22
Scott Bakerf579f132019-10-24 14:31:41 -070023func (r *request) encode(pe packetEncoder) error {
Scott Bakere7144bc2019-10-01 14:16:47 -070024 pe.push(&lengthField{})
25 pe.putInt16(r.body.key())
26 pe.putInt16(r.body.version())
27 pe.putInt32(r.correlationID)
Scott Bakerf579f132019-10-24 14:31:41 -070028
29 err := pe.putString(r.clientID)
Scott Bakere7144bc2019-10-01 14:16:47 -070030 if err != nil {
31 return err
32 }
Scott Bakerf579f132019-10-24 14:31:41 -070033
Scott Bakere7144bc2019-10-01 14:16:47 -070034 err = r.body.encode(pe)
35 if err != nil {
36 return err
37 }
Scott Bakerf579f132019-10-24 14:31:41 -070038
Scott Bakere7144bc2019-10-01 14:16:47 -070039 return pe.pop()
40}
41
42func (r *request) decode(pd packetDecoder) (err error) {
Scott Bakerf579f132019-10-24 14:31:41 -070043 key, err := pd.getInt16()
44 if err != nil {
Scott Bakere7144bc2019-10-01 14:16:47 -070045 return err
46 }
Scott Bakerf579f132019-10-24 14:31:41 -070047
48 version, err := pd.getInt16()
49 if err != nil {
Scott Bakere7144bc2019-10-01 14:16:47 -070050 return err
51 }
Scott Bakerf579f132019-10-24 14:31:41 -070052
53 r.correlationID, err = pd.getInt32()
54 if err != nil {
Scott Bakere7144bc2019-10-01 14:16:47 -070055 return err
56 }
Scott Bakerf579f132019-10-24 14:31:41 -070057
Scott Bakere7144bc2019-10-01 14:16:47 -070058 r.clientID, err = pd.getString()
Scott Bakerf579f132019-10-24 14:31:41 -070059 if err != nil {
60 return err
61 }
Scott Bakere7144bc2019-10-01 14:16:47 -070062
63 r.body = allocateBody(key, version)
64 if r.body == nil {
65 return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
66 }
Scott Bakerf579f132019-10-24 14:31:41 -070067
Scott Bakere7144bc2019-10-01 14:16:47 -070068 return r.body.decode(pd, version)
69}
70
Scott Bakerf579f132019-10-24 14:31:41 -070071func decodeRequest(r io.Reader) (*request, int, error) {
72 var (
73 bytesRead int
74 lengthBytes = make([]byte, 4)
75 )
76
Scott Bakere7144bc2019-10-01 14:16:47 -070077 if _, err := io.ReadFull(r, lengthBytes); err != nil {
78 return nil, bytesRead, err
79 }
Scott Bakere7144bc2019-10-01 14:16:47 -070080
Scott Bakerf579f132019-10-24 14:31:41 -070081 bytesRead += len(lengthBytes)
Scott Bakere7144bc2019-10-01 14:16:47 -070082 length := int32(binary.BigEndian.Uint32(lengthBytes))
Scott Bakerf579f132019-10-24 14:31:41 -070083
Scott Bakere7144bc2019-10-01 14:16:47 -070084 if length <= 4 || length > MaxRequestSize {
85 return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
86 }
87
88 encodedReq := make([]byte, length)
89 if _, err := io.ReadFull(r, encodedReq); err != nil {
90 return nil, bytesRead, err
91 }
Scott Bakerf579f132019-10-24 14:31:41 -070092
Scott Bakere7144bc2019-10-01 14:16:47 -070093 bytesRead += len(encodedReq)
94
Scott Bakerf579f132019-10-24 14:31:41 -070095 req := &request{}
Scott Bakere7144bc2019-10-01 14:16:47 -070096 if err := decode(encodedReq, req); err != nil {
97 return nil, bytesRead, err
98 }
Scott Bakerf579f132019-10-24 14:31:41 -070099
Scott Bakere7144bc2019-10-01 14:16:47 -0700100 return req, bytesRead, nil
101}
102
103func allocateBody(key, version int16) protocolBody {
104 switch key {
105 case 0:
106 return &ProduceRequest{}
107 case 1:
108 return &FetchRequest{}
109 case 2:
110 return &OffsetRequest{Version: version}
111 case 3:
112 return &MetadataRequest{}
113 case 8:
114 return &OffsetCommitRequest{Version: version}
115 case 9:
116 return &OffsetFetchRequest{}
117 case 10:
118 return &FindCoordinatorRequest{}
119 case 11:
120 return &JoinGroupRequest{}
121 case 12:
122 return &HeartbeatRequest{}
123 case 13:
124 return &LeaveGroupRequest{}
125 case 14:
126 return &SyncGroupRequest{}
127 case 15:
128 return &DescribeGroupsRequest{}
129 case 16:
130 return &ListGroupsRequest{}
131 case 17:
132 return &SaslHandshakeRequest{}
133 case 18:
134 return &ApiVersionsRequest{}
135 case 19:
136 return &CreateTopicsRequest{}
137 case 20:
138 return &DeleteTopicsRequest{}
139 case 21:
140 return &DeleteRecordsRequest{}
141 case 22:
142 return &InitProducerIDRequest{}
143 case 24:
144 return &AddPartitionsToTxnRequest{}
145 case 25:
146 return &AddOffsetsToTxnRequest{}
147 case 26:
148 return &EndTxnRequest{}
149 case 28:
150 return &TxnOffsetCommitRequest{}
151 case 29:
152 return &DescribeAclsRequest{}
153 case 30:
154 return &CreateAclsRequest{}
155 case 31:
156 return &DeleteAclsRequest{}
157 case 32:
158 return &DescribeConfigsRequest{}
159 case 33:
160 return &AlterConfigsRequest{}
161 case 36:
162 return &SaslAuthenticateRequest{}
163 case 37:
164 return &CreatePartitionsRequest{}
165 case 42:
166 return &DeleteGroupsRequest{}
167 }
168 return nil
169}