blob: 83a648ad4aec3a1109372b65dfac20d6605c05a4 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "time"
5)
6
7var NoNode = &Broker{id: -1, addr: ":-1"}
8
9type FindCoordinatorResponse struct {
10 Version int16
11 ThrottleTime time.Duration
12 Err KError
13 ErrMsg *string
14 Coordinator *Broker
15}
16
17func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) {
18 if version >= 1 {
19 f.Version = version
20
21 throttleTime, err := pd.getInt32()
22 if err != nil {
23 return err
24 }
25 f.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
26 }
27
28 tmp, err := pd.getInt16()
29 if err != nil {
30 return err
31 }
32 f.Err = KError(tmp)
33
34 if version >= 1 {
35 if f.ErrMsg, err = pd.getNullableString(); err != nil {
36 return err
37 }
38 }
39
40 coordinator := new(Broker)
41 // The version is hardcoded to 0, as version 1 of the Broker-decode
42 // contains the rack-field which is not present in the FindCoordinatorResponse.
43 if err := coordinator.decode(pd, 0); err != nil {
44 return err
45 }
46 if coordinator.addr == ":0" {
47 return nil
48 }
49 f.Coordinator = coordinator
50
51 return nil
52}
53
54func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
55 if f.Version >= 1 {
56 pe.putInt32(int32(f.ThrottleTime / time.Millisecond))
57 }
58
59 pe.putInt16(int16(f.Err))
60
61 if f.Version >= 1 {
62 if err := pe.putNullableString(f.ErrMsg); err != nil {
63 return err
64 }
65 }
66
67 coordinator := f.Coordinator
68 if coordinator == nil {
69 coordinator = NoNode
70 }
71 if err := coordinator.encode(pe, 0); err != nil {
72 return err
73 }
74 return nil
75}
76
77func (f *FindCoordinatorResponse) key() int16 {
78 return 10
79}
80
81func (f *FindCoordinatorResponse) version() int16 {
82 return f.Version
83}
84
khenaidood948f772021-08-11 17:49:24 -040085func (r *FindCoordinatorResponse) headerVersion() int16 {
86 return 0
87}
88
khenaidooac637102019-01-14 15:44:34 -050089func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion {
90 switch f.Version {
91 case 1:
92 return V0_11_0_0
93 default:
94 return V0_8_2_0
95 }
96}