blob: 411da38ad204570a1a65f883477849af08466c39 [file] [log] [blame]
package sarama
import "time"
type DescribeLogDirsResponse struct {
ThrottleTime time.Duration
// Version 0 and 1 are equal
// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
Version int16
LogDirs []DescribeLogDirsResponseDirMetadata
}
func (r *DescribeLogDirsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
if err := pe.putArrayLength(len(r.LogDirs)); err != nil {
return err
}
for _, dir := range r.LogDirs {
if err := dir.encode(pe); err != nil {
return err
}
}
return nil
}
func (r *DescribeLogDirsResponse) decode(pd packetDecoder, version int16) error {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
// Decode array of DescribeLogDirsResponseDirMetadata
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.LogDirs = make([]DescribeLogDirsResponseDirMetadata, n)
for i := 0; i < n; i++ {
dir := DescribeLogDirsResponseDirMetadata{}
if err := dir.decode(pd, version); err != nil {
return err
}
r.LogDirs[i] = dir
}
return nil
}
func (r *DescribeLogDirsResponse) key() int16 {
return 35
}
func (r *DescribeLogDirsResponse) version() int16 {
return r.Version
}
func (r *DescribeLogDirsResponse) headerVersion() int16 {
return 0
}
func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion {
return V1_0_0_0
}
type DescribeLogDirsResponseDirMetadata struct {
ErrorCode KError
// The absolute log directory path
Path string
Topics []DescribeLogDirsResponseTopic
}
func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
pe.putInt16(int16(r.ErrorCode))
if err := pe.putString(r.Path); err != nil {
return err
}
if err := pe.putArrayLength(len(r.Topics)); err != nil {
return err
}
for _, topic := range r.Topics {
if err := topic.encode(pe); err != nil {
return err
}
}
return nil
}
func (r *DescribeLogDirsResponseDirMetadata) decode(pd packetDecoder, version int16) error {
errCode, err := pd.getInt16()
if err != nil {
return err
}
r.ErrorCode = KError(errCode)
path, err := pd.getString()
if err != nil {
return err
}
r.Path = path
// Decode array of DescribeLogDirsResponseTopic
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Topics = make([]DescribeLogDirsResponseTopic, n)
for i := 0; i < n; i++ {
t := DescribeLogDirsResponseTopic{}
if err := t.decode(pd, version); err != nil {
return err
}
r.Topics[i] = t
}
return nil
}
// DescribeLogDirsResponseTopic contains a topic's partitions descriptions
type DescribeLogDirsResponseTopic struct {
Topic string
Partitions []DescribeLogDirsResponsePartition
}
func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
if err := pe.putString(r.Topic); err != nil {
return err
}
if err := pe.putArrayLength(len(r.Partitions)); err != nil {
return err
}
for _, partition := range r.Partitions {
if err := partition.encode(pe); err != nil {
return err
}
}
return nil
}
func (r *DescribeLogDirsResponseTopic) decode(pd packetDecoder, version int16) error {
t, err := pd.getString()
if err != nil {
return err
}
r.Topic = t
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Partitions = make([]DescribeLogDirsResponsePartition, n)
for i := 0; i < n; i++ {
p := DescribeLogDirsResponsePartition{}
if err := p.decode(pd, version); err != nil {
return err
}
r.Partitions[i] = p
}
return nil
}
// DescribeLogDirsResponsePartition describes a partition's log directory
type DescribeLogDirsResponsePartition struct {
PartitionID int32
// The size of the log segments of the partition in bytes.
Size int64
// The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or
// current replica's LEO (if it is the future log for the partition)
OffsetLag int64
// True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of
// the replica in the future.
IsTemporary bool
}
func (r *DescribeLogDirsResponsePartition) encode(pe packetEncoder) error {
pe.putInt32(r.PartitionID)
pe.putInt64(r.Size)
pe.putInt64(r.OffsetLag)
pe.putBool(r.IsTemporary)
return nil
}
func (r *DescribeLogDirsResponsePartition) decode(pd packetDecoder, version int16) error {
pID, err := pd.getInt32()
if err != nil {
return err
}
r.PartitionID = pID
size, err := pd.getInt64()
if err != nil {
return err
}
r.Size = size
lag, err := pd.getInt64()
if err != nil {
return err
}
r.OffsetLag = lag
isTemp, err := pd.getBool()
if err != nil {
return err
}
r.IsTemporary = isTemp
return nil
}