blob: 928f5a52ab2d7ec5d8c66506fc5ebe998ef16630 [file] [log] [blame]
package sarama
import (
"fmt"
"time"
)
type ConfigSource int8
func (s ConfigSource) String() string {
switch s {
case SourceUnknown:
return "Unknown"
case SourceTopic:
return "Topic"
case SourceDynamicBroker:
return "DynamicBroker"
case SourceDynamicDefaultBroker:
return "DynamicDefaultBroker"
case SourceStaticBroker:
return "StaticBroker"
case SourceDefault:
return "Default"
}
return fmt.Sprintf("Source Invalid: %d", int(s))
}
const (
SourceUnknown ConfigSource = iota
SourceTopic
SourceDynamicBroker
SourceDynamicDefaultBroker
SourceStaticBroker
SourceDefault
)
type DescribeConfigsResponse struct {
Version int16
ThrottleTime time.Duration
Resources []*ResourceResponse
}
type ResourceResponse struct {
ErrorCode int16
ErrorMsg string
Type ConfigResourceType
Name string
Configs []*ConfigEntry
}
type ConfigEntry struct {
Name string
Value string
ReadOnly bool
Default bool
Source ConfigSource
Sensitive bool
Synonyms []*ConfigSynonym
}
type ConfigSynonym struct {
ConfigName string
ConfigValue string
Source ConfigSource
}
func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
if err = pe.putArrayLength(len(r.Resources)); err != nil {
return err
}
for _, c := range r.Resources {
if err = c.encode(pe, r.Version); err != nil {
return err
}
}
return nil
}
func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Resources = make([]*ResourceResponse, n)
for i := 0; i < n; i++ {
rr := &ResourceResponse{}
if err := rr.decode(pd, version); err != nil {
return err
}
r.Resources[i] = rr
}
return nil
}
func (r *DescribeConfigsResponse) key() int16 {
return 32
}
func (r *DescribeConfigsResponse) version() int16 {
return r.Version
}
func (r *DescribeConfigsResponse) headerVersion() int16 {
return 0
}
func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V1_0_0_0
case 2:
return V2_0_0_0
default:
return V0_11_0_0
}
}
func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(r.ErrorCode)
if err = pe.putString(r.ErrorMsg); err != nil {
return err
}
pe.putInt8(int8(r.Type))
if err = pe.putString(r.Name); err != nil {
return err
}
if err = pe.putArrayLength(len(r.Configs)); err != nil {
return err
}
for _, c := range r.Configs {
if err = c.encode(pe, version); err != nil {
return err
}
}
return nil
}
func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
ec, err := pd.getInt16()
if err != nil {
return err
}
r.ErrorCode = ec
em, err := pd.getString()
if err != nil {
return err
}
r.ErrorMsg = em
t, err := pd.getInt8()
if err != nil {
return err
}
r.Type = ConfigResourceType(t)
name, err := pd.getString()
if err != nil {
return err
}
r.Name = name
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Configs = make([]*ConfigEntry, n)
for i := 0; i < n; i++ {
c := &ConfigEntry{}
if err := c.decode(pd, version); err != nil {
return err
}
r.Configs[i] = c
}
return nil
}
func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
if err = pe.putString(r.Name); err != nil {
return err
}
if err = pe.putString(r.Value); err != nil {
return err
}
pe.putBool(r.ReadOnly)
if version <= 0 {
pe.putBool(r.Default)
pe.putBool(r.Sensitive)
} else {
pe.putInt8(int8(r.Source))
pe.putBool(r.Sensitive)
if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
return err
}
for _, c := range r.Synonyms {
if err = c.encode(pe, version); err != nil {
return err
}
}
}
return nil
}
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
if version == 0 {
r.Source = SourceUnknown
}
name, err := pd.getString()
if err != nil {
return err
}
r.Name = name
value, err := pd.getString()
if err != nil {
return err
}
r.Value = value
read, err := pd.getBool()
if err != nil {
return err
}
r.ReadOnly = read
if version == 0 {
defaultB, err := pd.getBool()
if err != nil {
return err
}
r.Default = defaultB
if defaultB {
r.Source = SourceDefault
}
} else {
source, err := pd.getInt8()
if err != nil {
return err
}
r.Source = ConfigSource(source)
r.Default = r.Source == SourceDefault
}
sensitive, err := pd.getBool()
if err != nil {
return err
}
r.Sensitive = sensitive
if version > 0 {
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Synonyms = make([]*ConfigSynonym, n)
for i := 0; i < n; i++ {
s := &ConfigSynonym{}
if err := s.decode(pd, version); err != nil {
return err
}
r.Synonyms[i] = s
}
}
return nil
}
func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {
err = pe.putString(c.ConfigName)
if err != nil {
return err
}
err = pe.putString(c.ConfigValue)
if err != nil {
return err
}
pe.putInt8(int8(c.Source))
return nil
}
func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {
name, err := pd.getString()
if err != nil {
return nil
}
c.ConfigName = name
value, err := pd.getString()
if err != nil {
return nil
}
c.ConfigValue = value
source, err := pd.getInt8()
if err != nil {
return nil
}
c.Source = ConfigSource(source)
return nil
}