blob: 326c3720cc7a599a3d37f153b042006a83b216ed [file] [log] [blame]
package sarama
type offsetRequestBlock struct {
time int64
maxOffsets int32 // Only used in version 0
}
func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
pe.putInt64(int64(b.time))
if version == 0 {
pe.putInt32(b.maxOffsets)
}
return nil
}
func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
if b.time, err = pd.getInt64(); err != nil {
return err
}
if version == 0 {
if b.maxOffsets, err = pd.getInt32(); err != nil {
return err
}
}
return nil
}
type OffsetRequest struct {
Version int16
replicaID int32
isReplicaIDSet bool
blocks map[string]map[int32]*offsetRequestBlock
}
func (r *OffsetRequest) encode(pe packetEncoder) error {
if r.isReplicaIDSet {
pe.putInt32(r.replicaID)
} else {
// default replica ID is always -1 for clients
pe.putInt32(-1)
}
err := pe.putArrayLength(len(r.blocks))
if err != nil {
return err
}
for topic, partitions := range r.blocks {
err = pe.putString(topic)
if err != nil {
return err
}
err = pe.putArrayLength(len(partitions))
if err != nil {
return err
}
for partition, block := range partitions {
pe.putInt32(partition)
if err = block.encode(pe, r.Version); err != nil {
return err
}
}
}
return nil
}
func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
r.Version = version
replicaID, err := pd.getInt32()
if err != nil {
return err
}
if replicaID >= 0 {
r.SetReplicaID(replicaID)
}
blockCount, err := pd.getArrayLength()
if err != nil {
return err
}
if blockCount == 0 {
return nil
}
r.blocks = make(map[string]map[int32]*offsetRequestBlock)
for i := 0; i < blockCount; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
if err != nil {
return err
}
r.blocks[topic] = make(map[int32]*offsetRequestBlock)
for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
block := &offsetRequestBlock{}
if err := block.decode(pd, version); err != nil {
return err
}
r.blocks[topic][partition] = block
}
}
return nil
}
func (r *OffsetRequest) key() int16 {
return 2
}
func (r *OffsetRequest) version() int16 {
return r.Version
}
func (r *OffsetRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_10_1_0
default:
return MinVersion
}
}
func (r *OffsetRequest) SetReplicaID(id int32) {
r.replicaID = id
r.isReplicaIDSet = true
}
func (r *OffsetRequest) ReplicaID() int32 {
if r.isReplicaIDSet {
return r.replicaID
}
return -1
}
func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
if r.blocks == nil {
r.blocks = make(map[string]map[int32]*offsetRequestBlock)
}
if r.blocks[topic] == nil {
r.blocks[topic] = make(map[int32]*offsetRequestBlock)
}
tmp := new(offsetRequestBlock)
tmp.time = time
if r.Version == 0 {
tmp.maxOffsets = maxOffsets
}
r.blocks[topic][partitionID] = tmp
}