blob: 97437d67bd496c70f103b0a513b181557cd446bb [file] [log] [blame]
package sarama
import (
"encoding/binary"
"fmt"
"io"
)
type protocolBody interface {
encoder
versionedDecoder
key() int16
version() int16
requiredVersion() KafkaVersion
}
type request struct {
correlationID int32
clientID string
body protocolBody
}
func (r *request) encode(pe packetEncoder) error {
pe.push(&lengthField{})
pe.putInt16(r.body.key())
pe.putInt16(r.body.version())
pe.putInt32(r.correlationID)
err := pe.putString(r.clientID)
if err != nil {
return err
}
err = r.body.encode(pe)
if err != nil {
return err
}
return pe.pop()
}
func (r *request) decode(pd packetDecoder) (err error) {
key, err := pd.getInt16()
if err != nil {
return err
}
version, err := pd.getInt16()
if err != nil {
return err
}
r.correlationID, err = pd.getInt32()
if err != nil {
return err
}
r.clientID, err = pd.getString()
if err != nil {
return err
}
r.body = allocateBody(key, version)
if r.body == nil {
return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
}
return r.body.decode(pd, version)
}
func decodeRequest(r io.Reader) (*request, int, error) {
var (
bytesRead int
lengthBytes = make([]byte, 4)
)
if _, err := io.ReadFull(r, lengthBytes); err != nil {
return nil, bytesRead, err
}
bytesRead += len(lengthBytes)
length := int32(binary.BigEndian.Uint32(lengthBytes))
if length <= 4 || length > MaxRequestSize {
return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
}
encodedReq := make([]byte, length)
if _, err := io.ReadFull(r, encodedReq); err != nil {
return nil, bytesRead, err
}
bytesRead += len(encodedReq)
req := &request{}
if err := decode(encodedReq, req); err != nil {
return nil, bytesRead, err
}
return req, bytesRead, nil
}
func allocateBody(key, version int16) protocolBody {
switch key {
case 0:
return &ProduceRequest{}
case 1:
return &FetchRequest{}
case 2:
return &OffsetRequest{Version: version}
case 3:
return &MetadataRequest{}
case 8:
return &OffsetCommitRequest{Version: version}
case 9:
return &OffsetFetchRequest{}
case 10:
return &FindCoordinatorRequest{}
case 11:
return &JoinGroupRequest{}
case 12:
return &HeartbeatRequest{}
case 13:
return &LeaveGroupRequest{}
case 14:
return &SyncGroupRequest{}
case 15:
return &DescribeGroupsRequest{}
case 16:
return &ListGroupsRequest{}
case 17:
return &SaslHandshakeRequest{}
case 18:
return &ApiVersionsRequest{}
case 19:
return &CreateTopicsRequest{}
case 20:
return &DeleteTopicsRequest{}
case 21:
return &DeleteRecordsRequest{}
case 22:
return &InitProducerIDRequest{}
case 24:
return &AddPartitionsToTxnRequest{}
case 25:
return &AddOffsetsToTxnRequest{}
case 26:
return &EndTxnRequest{}
case 28:
return &TxnOffsetCommitRequest{}
case 29:
return &DescribeAclsRequest{}
case 30:
return &CreateAclsRequest{}
case 31:
return &DeleteAclsRequest{}
case 32:
return &DescribeConfigsRequest{}
case 33:
return &AlterConfigsRequest{}
case 35:
return &DescribeLogDirsRequest{}
case 36:
return &SaslAuthenticateRequest{}
case 37:
return &CreatePartitionsRequest{}
case 42:
return &DeleteGroupsRequest{}
}
return nil
}