blob: dd01e4ef1fb487a8eb6977ead1e49d3ec7be7d2a [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "crypto/tls"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "sort"
10 "strconv"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15
khenaidoo106c61a2021-08-11 18:05:46 -040016 "github.com/rcrowley/go-metrics"
William Kurkianea869482019-04-09 15:16:11 -040017)
18
19// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
20type Broker struct {
Abhilash S.L3b494632019-07-16 15:51:09 +053021 conf *Config
William Kurkianea869482019-04-09 15:16:11 -040022 rack *string
23
Abhilash S.L3b494632019-07-16 15:51:09 +053024 id int32
25 addr string
William Kurkianea869482019-04-09 15:16:11 -040026 correlationID int32
27 conn net.Conn
28 connErr error
29 lock sync.Mutex
30 opened int32
Abhilash S.L3b494632019-07-16 15:51:09 +053031 responses chan responsePromise
32 done chan bool
William Kurkianea869482019-04-09 15:16:11 -040033
Abhilash S.L3b494632019-07-16 15:51:09 +053034 registeredMetrics []string
William Kurkianea869482019-04-09 15:16:11 -040035
36 incomingByteRate metrics.Meter
37 requestRate metrics.Meter
38 requestSize metrics.Histogram
39 requestLatency metrics.Histogram
40 outgoingByteRate metrics.Meter
41 responseRate metrics.Meter
42 responseSize metrics.Histogram
khenaidoo106c61a2021-08-11 18:05:46 -040043 requestsInFlight metrics.Counter
William Kurkianea869482019-04-09 15:16:11 -040044 brokerIncomingByteRate metrics.Meter
45 brokerRequestRate metrics.Meter
46 brokerRequestSize metrics.Histogram
47 brokerRequestLatency metrics.Histogram
48 brokerOutgoingByteRate metrics.Meter
49 brokerResponseRate metrics.Meter
50 brokerResponseSize metrics.Histogram
khenaidoo106c61a2021-08-11 18:05:46 -040051 brokerRequestsInFlight metrics.Counter
Abhilash S.L3b494632019-07-16 15:51:09 +053052
53 kerberosAuthenticator GSSAPIKerberosAuth
William Kurkianea869482019-04-09 15:16:11 -040054}
55
56// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
57type SASLMechanism string
58
59const (
60 // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
61 SASLTypeOAuth = "OAUTHBEARER"
62 // SASLTypePlaintext represents the SASL/PLAIN mechanism
63 SASLTypePlaintext = "PLAIN"
Abhilash S.L3b494632019-07-16 15:51:09 +053064 // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
65 SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
66 // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
67 SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
68 SASLTypeGSSAPI = "GSSAPI"
William Kurkianea869482019-04-09 15:16:11 -040069 // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
70 // server negotiate SASL auth using opaque packets.
71 SASLHandshakeV0 = int16(0)
72 // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
73 // server negotiate SASL by wrapping tokens with Kafka protocol headers.
74 SASLHandshakeV1 = int16(1)
75 // SASLExtKeyAuth is the reserved extension key name sent as part of the
khenaidoo106c61a2021-08-11 18:05:46 -040076 // SASL/OAUTHBEARER initial client response
William Kurkianea869482019-04-09 15:16:11 -040077 SASLExtKeyAuth = "auth"
78)
79
80// AccessToken contains an access token used to authenticate a
81// SASL/OAUTHBEARER client along with associated metadata.
82type AccessToken struct {
83 // Token is the access token payload.
84 Token string
85 // Extensions is a optional map of arbitrary key-value pairs that can be
86 // sent with the SASL/OAUTHBEARER initial client response. These values are
87 // ignored by the SASL server if they are unexpected. This feature is only
88 // supported by Kafka >= 2.1.0.
89 Extensions map[string]string
90}
91
92// AccessTokenProvider is the interface that encapsulates how implementors
93// can generate access tokens for Kafka broker authentication.
94type AccessTokenProvider interface {
95 // Token returns an access token. The implementation should ensure token
96 // reuse so that multiple calls at connect time do not create multiple
97 // tokens. The implementation should also periodically refresh the token in
98 // order to guarantee that each call returns an unexpired token. This
99 // method should not block indefinitely--a timeout error should be returned
100 // after a short period of inactivity so that the broker connection logic
101 // can log debugging information and retry.
102 Token() (*AccessToken, error)
103}
104
Abhilash S.L3b494632019-07-16 15:51:09 +0530105// SCRAMClient is a an interface to a SCRAM
106// client implementation.
107type SCRAMClient interface {
108 // Begin prepares the client for the SCRAM exchange
109 // with the server with a user name and a password
110 Begin(userName, password, authzID string) error
111 // Step steps client through the SCRAM exchange. It is
112 // called repeatedly until it errors or `Done` returns true.
113 Step(challenge string) (response string, err error)
114 // Done should return true when the SCRAM conversation
115 // is over.
116 Done() bool
117}
118
William Kurkianea869482019-04-09 15:16:11 -0400119type responsePromise struct {
120 requestTime time.Time
121 correlationID int32
khenaidoo106c61a2021-08-11 18:05:46 -0400122 headerVersion int16
William Kurkianea869482019-04-09 15:16:11 -0400123 packets chan []byte
124 errors chan error
125}
126
127// NewBroker creates and returns a Broker targeting the given host:port address.
128// This does not attempt to actually connect, you have to call Open() for that.
129func NewBroker(addr string) *Broker {
130 return &Broker{id: -1, addr: addr}
131}
132
133// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
134// waiting for the connection to complete. This means that any subsequent operations on the broker will
135// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
136// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
137// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
138func (b *Broker) Open(conf *Config) error {
139 if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
140 return ErrAlreadyConnected
141 }
142
143 if conf == nil {
144 conf = NewConfig()
145 }
146
147 err := conf.Validate()
148 if err != nil {
149 return err
150 }
151
152 b.lock.Lock()
153
154 go withRecover(func() {
155 defer b.lock.Unlock()
156
khenaidoo106c61a2021-08-11 18:05:46 -0400157 dialer := conf.getDialer()
158 b.conn, b.connErr = dialer.Dial("tcp", b.addr)
William Kurkianea869482019-04-09 15:16:11 -0400159 if b.connErr != nil {
160 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
161 b.conn = nil
162 atomic.StoreInt32(&b.opened, 0)
163 return
164 }
khenaidoo106c61a2021-08-11 18:05:46 -0400165 if conf.Net.TLS.Enable {
166 b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
167 }
William Kurkianea869482019-04-09 15:16:11 -0400168
khenaidoo106c61a2021-08-11 18:05:46 -0400169 b.conn = newBufConn(b.conn)
William Kurkianea869482019-04-09 15:16:11 -0400170 b.conf = conf
171
172 // Create or reuse the global metrics shared between brokers
173 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
174 b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
175 b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
176 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
177 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
178 b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
179 b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
khenaidoo106c61a2021-08-11 18:05:46 -0400180 b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
William Kurkianea869482019-04-09 15:16:11 -0400181 // Do not gather metrics for seeded broker (only used during bootstrap) because they share
182 // the same id (-1) and are already exposed through the global metrics above
183 if b.id >= 0 {
Abhilash S.L3b494632019-07-16 15:51:09 +0530184 b.registerMetrics()
William Kurkianea869482019-04-09 15:16:11 -0400185 }
186
187 if conf.Net.SASL.Enable {
William Kurkianea869482019-04-09 15:16:11 -0400188 b.connErr = b.authenticateViaSASL()
189
190 if b.connErr != nil {
191 err = b.conn.Close()
192 if err == nil {
193 Logger.Printf("Closed connection to broker %s\n", b.addr)
194 } else {
195 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
196 }
197 b.conn = nil
198 atomic.StoreInt32(&b.opened, 0)
199 return
200 }
201 }
202
203 b.done = make(chan bool)
204 b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
205
206 if b.id >= 0 {
207 Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
208 } else {
209 Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
210 }
211 go withRecover(b.responseReceiver)
212 })
213
214 return nil
215}
216
217// Connected returns true if the broker is connected and false otherwise. If the broker is not
218// connected but it had tried to connect, the error from that connection attempt is also returned.
219func (b *Broker) Connected() (bool, error) {
220 b.lock.Lock()
221 defer b.lock.Unlock()
222
223 return b.conn != nil, b.connErr
224}
225
khenaidoo106c61a2021-08-11 18:05:46 -0400226// Close closes the broker resources
William Kurkianea869482019-04-09 15:16:11 -0400227func (b *Broker) Close() error {
228 b.lock.Lock()
229 defer b.lock.Unlock()
230
231 if b.conn == nil {
232 return ErrNotConnected
233 }
234
235 close(b.responses)
236 <-b.done
237
238 err := b.conn.Close()
239
240 b.conn = nil
241 b.connErr = nil
242 b.done = nil
243 b.responses = nil
244
Abhilash S.L3b494632019-07-16 15:51:09 +0530245 b.unregisterMetrics()
William Kurkianea869482019-04-09 15:16:11 -0400246
247 if err == nil {
248 Logger.Printf("Closed connection to broker %s\n", b.addr)
249 } else {
250 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
251 }
252
253 atomic.StoreInt32(&b.opened, 0)
254
255 return err
256}
257
258// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
259func (b *Broker) ID() int32 {
260 return b.id
261}
262
263// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
264func (b *Broker) Addr() string {
265 return b.addr
266}
267
268// Rack returns the broker's rack as retrieved from Kafka's metadata or the
269// empty string if it is not known. The returned value corresponds to the
270// broker's broker.rack configuration setting. Requires protocol version to be
271// at least v0.10.0.0.
272func (b *Broker) Rack() string {
273 if b.rack == nil {
274 return ""
275 }
276 return *b.rack
277}
278
khenaidoo106c61a2021-08-11 18:05:46 -0400279// GetMetadata send a metadata request and returns a metadata response or error
William Kurkianea869482019-04-09 15:16:11 -0400280func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
281 response := new(MetadataResponse)
282
283 err := b.sendAndReceive(request, response)
William Kurkianea869482019-04-09 15:16:11 -0400284 if err != nil {
285 return nil, err
286 }
287
288 return response, nil
289}
290
khenaidoo106c61a2021-08-11 18:05:46 -0400291// GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
William Kurkianea869482019-04-09 15:16:11 -0400292func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
293 response := new(ConsumerMetadataResponse)
294
295 err := b.sendAndReceive(request, response)
William Kurkianea869482019-04-09 15:16:11 -0400296 if err != nil {
297 return nil, err
298 }
299
300 return response, nil
301}
302
khenaidoo106c61a2021-08-11 18:05:46 -0400303// FindCoordinator sends a find coordinate request and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400304func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
305 response := new(FindCoordinatorResponse)
306
307 err := b.sendAndReceive(request, response)
William Kurkianea869482019-04-09 15:16:11 -0400308 if err != nil {
309 return nil, err
310 }
311
312 return response, nil
313}
314
khenaidoo106c61a2021-08-11 18:05:46 -0400315// GetAvailableOffsets return an offset response or error
William Kurkianea869482019-04-09 15:16:11 -0400316func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
317 response := new(OffsetResponse)
318
319 err := b.sendAndReceive(request, response)
William Kurkianea869482019-04-09 15:16:11 -0400320 if err != nil {
321 return nil, err
322 }
323
324 return response, nil
325}
326
khenaidoo106c61a2021-08-11 18:05:46 -0400327// Produce returns a produce response or error
William Kurkianea869482019-04-09 15:16:11 -0400328func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
Abhilash S.L3b494632019-07-16 15:51:09 +0530329 var (
330 response *ProduceResponse
331 err error
332 )
William Kurkianea869482019-04-09 15:16:11 -0400333
334 if request.RequiredAcks == NoResponse {
335 err = b.sendAndReceive(request, nil)
336 } else {
337 response = new(ProduceResponse)
338 err = b.sendAndReceive(request, response)
339 }
340
341 if err != nil {
342 return nil, err
343 }
344
345 return response, nil
346}
347
khenaidoo106c61a2021-08-11 18:05:46 -0400348// Fetch returns a FetchResponse or error
William Kurkianea869482019-04-09 15:16:11 -0400349func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
350 response := new(FetchResponse)
351
352 err := b.sendAndReceive(request, response)
William Kurkianea869482019-04-09 15:16:11 -0400353 if err != nil {
354 return nil, err
355 }
356
357 return response, nil
358}
359
khenaidoo106c61a2021-08-11 18:05:46 -0400360// CommitOffset return an Offset commit response or error
William Kurkianea869482019-04-09 15:16:11 -0400361func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
362 response := new(OffsetCommitResponse)
363
364 err := b.sendAndReceive(request, response)
William Kurkianea869482019-04-09 15:16:11 -0400365 if err != nil {
366 return nil, err
367 }
368
369 return response, nil
370}
371
khenaidoo106c61a2021-08-11 18:05:46 -0400372// FetchOffset returns an offset fetch response or error
William Kurkianea869482019-04-09 15:16:11 -0400373func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
374 response := new(OffsetFetchResponse)
khenaidoo106c61a2021-08-11 18:05:46 -0400375 response.Version = request.Version // needed to handle the two header versions
William Kurkianea869482019-04-09 15:16:11 -0400376
377 err := b.sendAndReceive(request, response)
William Kurkianea869482019-04-09 15:16:11 -0400378 if err != nil {
379 return nil, err
380 }
381
382 return response, nil
383}
384
khenaidoo106c61a2021-08-11 18:05:46 -0400385// JoinGroup returns a join group response or error
William Kurkianea869482019-04-09 15:16:11 -0400386func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
387 response := new(JoinGroupResponse)
388
389 err := b.sendAndReceive(request, response)
390 if err != nil {
391 return nil, err
392 }
393
394 return response, nil
395}
396
khenaidoo106c61a2021-08-11 18:05:46 -0400397// SyncGroup returns a sync group response or error
William Kurkianea869482019-04-09 15:16:11 -0400398func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
399 response := new(SyncGroupResponse)
400
401 err := b.sendAndReceive(request, response)
402 if err != nil {
403 return nil, err
404 }
405
406 return response, nil
407}
408
khenaidoo106c61a2021-08-11 18:05:46 -0400409// LeaveGroup return a leave group response or error
William Kurkianea869482019-04-09 15:16:11 -0400410func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
411 response := new(LeaveGroupResponse)
412
413 err := b.sendAndReceive(request, response)
414 if err != nil {
415 return nil, err
416 }
417
418 return response, nil
419}
420
khenaidoo106c61a2021-08-11 18:05:46 -0400421// Heartbeat returns a heartbeat response or error
William Kurkianea869482019-04-09 15:16:11 -0400422func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
423 response := new(HeartbeatResponse)
424
425 err := b.sendAndReceive(request, response)
426 if err != nil {
427 return nil, err
428 }
429
430 return response, nil
431}
432
khenaidoo106c61a2021-08-11 18:05:46 -0400433// ListGroups return a list group response or error
William Kurkianea869482019-04-09 15:16:11 -0400434func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
435 response := new(ListGroupsResponse)
436
437 err := b.sendAndReceive(request, response)
438 if err != nil {
439 return nil, err
440 }
441
442 return response, nil
443}
444
khenaidoo106c61a2021-08-11 18:05:46 -0400445// DescribeGroups return describe group response or error
William Kurkianea869482019-04-09 15:16:11 -0400446func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
447 response := new(DescribeGroupsResponse)
448
449 err := b.sendAndReceive(request, response)
450 if err != nil {
451 return nil, err
452 }
453
454 return response, nil
455}
456
khenaidoo106c61a2021-08-11 18:05:46 -0400457// ApiVersions return api version response or error
William Kurkianea869482019-04-09 15:16:11 -0400458func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
459 response := new(ApiVersionsResponse)
460
461 err := b.sendAndReceive(request, response)
462 if err != nil {
463 return nil, err
464 }
465
466 return response, nil
467}
468
khenaidoo106c61a2021-08-11 18:05:46 -0400469// CreateTopics send a create topic request and returns create topic response
William Kurkianea869482019-04-09 15:16:11 -0400470func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
471 response := new(CreateTopicsResponse)
472
473 err := b.sendAndReceive(request, response)
474 if err != nil {
475 return nil, err
476 }
477
478 return response, nil
479}
480
khenaidoo106c61a2021-08-11 18:05:46 -0400481// DeleteTopics sends a delete topic request and returns delete topic response
William Kurkianea869482019-04-09 15:16:11 -0400482func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
483 response := new(DeleteTopicsResponse)
484
485 err := b.sendAndReceive(request, response)
486 if err != nil {
487 return nil, err
488 }
489
490 return response, nil
491}
492
khenaidoo106c61a2021-08-11 18:05:46 -0400493// CreatePartitions sends a create partition request and returns create
494// partitions response or error
William Kurkianea869482019-04-09 15:16:11 -0400495func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
496 response := new(CreatePartitionsResponse)
497
498 err := b.sendAndReceive(request, response)
499 if err != nil {
500 return nil, err
501 }
502
503 return response, nil
504}
505
khenaidoo106c61a2021-08-11 18:05:46 -0400506// AlterPartitionReassignments sends a alter partition reassignments request and
507// returns alter partition reassignments response
508func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) {
509 response := new(AlterPartitionReassignmentsResponse)
510
511 err := b.sendAndReceive(request, response)
512 if err != nil {
513 return nil, err
514 }
515
516 return response, nil
517}
518
519// ListPartitionReassignments sends a list partition reassignments request and
520// returns list partition reassignments response
521func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
522 response := new(ListPartitionReassignmentsResponse)
523
524 err := b.sendAndReceive(request, response)
525 if err != nil {
526 return nil, err
527 }
528
529 return response, nil
530}
531
532// DeleteRecords send a request to delete records and return delete record
533// response or error
William Kurkianea869482019-04-09 15:16:11 -0400534func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
535 response := new(DeleteRecordsResponse)
536
537 err := b.sendAndReceive(request, response)
538 if err != nil {
539 return nil, err
540 }
541
542 return response, nil
543}
544
khenaidoo106c61a2021-08-11 18:05:46 -0400545// DescribeAcls sends a describe acl request and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400546func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
547 response := new(DescribeAclsResponse)
548
549 err := b.sendAndReceive(request, response)
550 if err != nil {
551 return nil, err
552 }
553
554 return response, nil
555}
556
khenaidoo106c61a2021-08-11 18:05:46 -0400557// CreateAcls sends a create acl request and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400558func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
559 response := new(CreateAclsResponse)
560
561 err := b.sendAndReceive(request, response)
562 if err != nil {
563 return nil, err
564 }
565
566 return response, nil
567}
568
khenaidoo106c61a2021-08-11 18:05:46 -0400569// DeleteAcls sends a delete acl request and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400570func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
571 response := new(DeleteAclsResponse)
572
573 err := b.sendAndReceive(request, response)
574 if err != nil {
575 return nil, err
576 }
577
578 return response, nil
579}
580
khenaidoo106c61a2021-08-11 18:05:46 -0400581// InitProducerID sends an init producer request and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400582func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
583 response := new(InitProducerIDResponse)
584
585 err := b.sendAndReceive(request, response)
586 if err != nil {
587 return nil, err
588 }
589
590 return response, nil
591}
592
khenaidoo106c61a2021-08-11 18:05:46 -0400593// AddPartitionsToTxn send a request to add partition to txn and returns
594// a response or error
William Kurkianea869482019-04-09 15:16:11 -0400595func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
596 response := new(AddPartitionsToTxnResponse)
597
598 err := b.sendAndReceive(request, response)
599 if err != nil {
600 return nil, err
601 }
602
603 return response, nil
604}
605
khenaidoo106c61a2021-08-11 18:05:46 -0400606// AddOffsetsToTxn sends a request to add offsets to txn and returns a response
607// or error
William Kurkianea869482019-04-09 15:16:11 -0400608func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
609 response := new(AddOffsetsToTxnResponse)
610
611 err := b.sendAndReceive(request, response)
612 if err != nil {
613 return nil, err
614 }
615
616 return response, nil
617}
618
khenaidoo106c61a2021-08-11 18:05:46 -0400619// EndTxn sends a request to end txn and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400620func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
621 response := new(EndTxnResponse)
622
623 err := b.sendAndReceive(request, response)
624 if err != nil {
625 return nil, err
626 }
627
628 return response, nil
629}
630
khenaidoo106c61a2021-08-11 18:05:46 -0400631// TxnOffsetCommit sends a request to commit transaction offsets and returns
632// a response or error
William Kurkianea869482019-04-09 15:16:11 -0400633func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
634 response := new(TxnOffsetCommitResponse)
635
636 err := b.sendAndReceive(request, response)
637 if err != nil {
638 return nil, err
639 }
640
641 return response, nil
642}
643
khenaidoo106c61a2021-08-11 18:05:46 -0400644// DescribeConfigs sends a request to describe config and returns a response or
645// error
William Kurkianea869482019-04-09 15:16:11 -0400646func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
647 response := new(DescribeConfigsResponse)
648
649 err := b.sendAndReceive(request, response)
650 if err != nil {
651 return nil, err
652 }
653
654 return response, nil
655}
656
khenaidoo106c61a2021-08-11 18:05:46 -0400657// AlterConfigs sends a request to alter config and return a response or error
William Kurkianea869482019-04-09 15:16:11 -0400658func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
659 response := new(AlterConfigsResponse)
660
661 err := b.sendAndReceive(request, response)
662 if err != nil {
663 return nil, err
664 }
665
666 return response, nil
667}
668
khenaidoo106c61a2021-08-11 18:05:46 -0400669// IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
670func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
671 response := new(IncrementalAlterConfigsResponse)
672
673 err := b.sendAndReceive(request, response)
674 if err != nil {
675 return nil, err
676 }
677
678 return response, nil
679}
680
681// DeleteGroups sends a request to delete groups and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400682func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
683 response := new(DeleteGroupsResponse)
684
685 if err := b.sendAndReceive(request, response); err != nil {
686 return nil, err
687 }
688
689 return response, nil
690}
691
khenaidoo106c61a2021-08-11 18:05:46 -0400692// DescribeLogDirs sends a request to get the broker's log dir paths and sizes
693func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
694 response := new(DescribeLogDirsResponse)
695
696 err := b.sendAndReceive(request, response)
697 if err != nil {
698 return nil, err
699 }
700
701 return response, nil
702}
703
704// DescribeUserScramCredentials sends a request to get SCRAM users
705func (b *Broker) DescribeUserScramCredentials(req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
706 res := new(DescribeUserScramCredentialsResponse)
707
708 err := b.sendAndReceive(req, res)
709 if err != nil {
710 return nil, err
711 }
712
713 return res, err
714}
715
716func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
717 res := new(AlterUserScramCredentialsResponse)
718
719 err := b.sendAndReceive(req, res)
720 if err != nil {
721 return nil, err
722 }
723
724 return res, nil
725}
726
727// readFull ensures the conn ReadDeadline has been setup before making a
728// call to io.ReadFull
729func (b *Broker) readFull(buf []byte) (n int, err error) {
730 if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
731 return 0, err
732 }
733
734 return io.ReadFull(b.conn, buf)
735}
736
737// write ensures the conn WriteDeadline has been setup before making a
738// call to conn.Write
739func (b *Broker) write(buf []byte) (n int, err error) {
740 if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
741 return 0, err
742 }
743
744 return b.conn.Write(buf)
745}
746
747func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
William Kurkianea869482019-04-09 15:16:11 -0400748 b.lock.Lock()
749 defer b.lock.Unlock()
750
751 if b.conn == nil {
752 if b.connErr != nil {
753 return nil, b.connErr
754 }
755 return nil, ErrNotConnected
756 }
757
758 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
759 return nil, ErrUnsupportedVersion
760 }
761
762 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
763 buf, err := encode(req, b.conf.MetricRegistry)
764 if err != nil {
765 return nil, err
766 }
767
William Kurkianea869482019-04-09 15:16:11 -0400768 requestTime := time.Now()
khenaidoo106c61a2021-08-11 18:05:46 -0400769 // Will be decremented in responseReceiver (except error or request with NoResponse)
770 b.addRequestInFlightMetrics(1)
771 bytes, err := b.write(buf)
772 b.updateOutgoingCommunicationMetrics(bytes)
William Kurkianea869482019-04-09 15:16:11 -0400773 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400774 b.addRequestInFlightMetrics(-1)
William Kurkianea869482019-04-09 15:16:11 -0400775 return nil, err
776 }
777 b.correlationID++
778
779 if !promiseResponse {
780 // Record request latency without the response
khenaidoo106c61a2021-08-11 18:05:46 -0400781 b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
William Kurkianea869482019-04-09 15:16:11 -0400782 return nil, nil
783 }
784
khenaidoo106c61a2021-08-11 18:05:46 -0400785 promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
William Kurkianea869482019-04-09 15:16:11 -0400786 b.responses <- promise
787
788 return &promise, nil
789}
790
khenaidoo106c61a2021-08-11 18:05:46 -0400791func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
792 responseHeaderVersion := int16(-1)
793 if res != nil {
794 responseHeaderVersion = res.headerVersion()
795 }
796
797 promise, err := b.send(req, res != nil, responseHeaderVersion)
William Kurkianea869482019-04-09 15:16:11 -0400798 if err != nil {
799 return err
800 }
801
802 if promise == nil {
803 return nil
804 }
805
806 select {
807 case buf := <-promise.packets:
808 return versionedDecode(buf, res, req.version())
809 case err = <-promise.errors:
810 return err
811 }
812}
813
814func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
815 b.id, err = pd.getInt32()
816 if err != nil {
817 return err
818 }
819
820 host, err := pd.getString()
821 if err != nil {
822 return err
823 }
824
825 port, err := pd.getInt32()
826 if err != nil {
827 return err
828 }
829
830 if version >= 1 {
831 b.rack, err = pd.getNullableString()
832 if err != nil {
833 return err
834 }
835 }
836
837 b.addr = net.JoinHostPort(host, fmt.Sprint(port))
838 if _, _, err := net.SplitHostPort(b.addr); err != nil {
839 return err
840 }
841
842 return nil
843}
844
845func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
William Kurkianea869482019-04-09 15:16:11 -0400846 host, portstr, err := net.SplitHostPort(b.addr)
847 if err != nil {
848 return err
849 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530850
khenaidoo106c61a2021-08-11 18:05:46 -0400851 port, err := strconv.ParseInt(portstr, 10, 32)
William Kurkianea869482019-04-09 15:16:11 -0400852 if err != nil {
853 return err
854 }
855
856 pe.putInt32(b.id)
857
858 err = pe.putString(host)
859 if err != nil {
860 return err
861 }
862
863 pe.putInt32(int32(port))
864
865 if version >= 1 {
866 err = pe.putNullableString(b.rack)
867 if err != nil {
868 return err
869 }
870 }
871
872 return nil
873}
874
875func (b *Broker) responseReceiver() {
876 var dead error
Abhilash S.L3b494632019-07-16 15:51:09 +0530877
William Kurkianea869482019-04-09 15:16:11 -0400878 for response := range b.responses {
879 if dead != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400880 // This was previously incremented in send() and
881 // we are not calling updateIncomingCommunicationMetrics()
882 b.addRequestInFlightMetrics(-1)
William Kurkianea869482019-04-09 15:16:11 -0400883 response.errors <- dead
884 continue
885 }
886
khenaidoo106c61a2021-08-11 18:05:46 -0400887 headerLength := getHeaderLength(response.headerVersion)
888 header := make([]byte, headerLength)
William Kurkianea869482019-04-09 15:16:11 -0400889
khenaidoo106c61a2021-08-11 18:05:46 -0400890 bytesReadHeader, err := b.readFull(header)
William Kurkianea869482019-04-09 15:16:11 -0400891 requestLatency := time.Since(response.requestTime)
892 if err != nil {
893 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
894 dead = err
895 response.errors <- err
896 continue
897 }
898
899 decodedHeader := responseHeader{}
khenaidoo106c61a2021-08-11 18:05:46 -0400900 err = versionedDecode(header, &decodedHeader, response.headerVersion)
William Kurkianea869482019-04-09 15:16:11 -0400901 if err != nil {
902 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
903 dead = err
904 response.errors <- err
905 continue
906 }
907 if decodedHeader.correlationID != response.correlationID {
908 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
909 // TODO if decoded ID < cur ID, discard until we catch up
910 // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
911 dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
912 response.errors <- dead
913 continue
914 }
915
khenaidoo106c61a2021-08-11 18:05:46 -0400916 buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
917 bytesReadBody, err := b.readFull(buf)
William Kurkianea869482019-04-09 15:16:11 -0400918 b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
919 if err != nil {
920 dead = err
921 response.errors <- err
922 continue
923 }
924
925 response.packets <- buf
926 }
927 close(b.done)
928}
929
khenaidoo106c61a2021-08-11 18:05:46 -0400930func getHeaderLength(headerVersion int16) int8 {
931 if headerVersion < 1 {
932 return 8
933 } else {
934 // header contains additional tagged field length (0), we don't support actual tags yet.
935 return 9
936 }
937}
938
William Kurkianea869482019-04-09 15:16:11 -0400939func (b *Broker) authenticateViaSASL() error {
Abhilash S.L3b494632019-07-16 15:51:09 +0530940 switch b.conf.Net.SASL.Mechanism {
941 case SASLTypeOAuth:
William Kurkianea869482019-04-09 15:16:11 -0400942 return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
Abhilash S.L3b494632019-07-16 15:51:09 +0530943 case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
944 return b.sendAndReceiveSASLSCRAMv1()
945 case SASLTypeGSSAPI:
946 return b.sendAndReceiveKerberos()
947 default:
948 return b.sendAndReceiveSASLPlainAuth()
William Kurkianea869482019-04-09 15:16:11 -0400949 }
William Kurkianea869482019-04-09 15:16:11 -0400950}
951
Abhilash S.L3b494632019-07-16 15:51:09 +0530952func (b *Broker) sendAndReceiveKerberos() error {
953 b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
954 if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
955 b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
956 }
957 return b.kerberosAuthenticator.Authorize(b)
958}
959
960func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
961 rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
William Kurkianea869482019-04-09 15:16:11 -0400962
963 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
964 buf, err := encode(req, b.conf.MetricRegistry)
965 if err != nil {
966 return err
967 }
968
William Kurkianea869482019-04-09 15:16:11 -0400969 requestTime := time.Now()
khenaidoo106c61a2021-08-11 18:05:46 -0400970 // Will be decremented in updateIncomingCommunicationMetrics (except error)
971 b.addRequestInFlightMetrics(1)
972 bytes, err := b.write(buf)
William Kurkianea869482019-04-09 15:16:11 -0400973 b.updateOutgoingCommunicationMetrics(bytes)
974 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400975 b.addRequestInFlightMetrics(-1)
William Kurkianea869482019-04-09 15:16:11 -0400976 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
977 return err
978 }
979 b.correlationID++
khenaidoo106c61a2021-08-11 18:05:46 -0400980
William Kurkianea869482019-04-09 15:16:11 -0400981 header := make([]byte, 8) // response header
khenaidoo106c61a2021-08-11 18:05:46 -0400982 _, err = b.readFull(header)
William Kurkianea869482019-04-09 15:16:11 -0400983 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400984 b.addRequestInFlightMetrics(-1)
William Kurkianea869482019-04-09 15:16:11 -0400985 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
986 return err
987 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530988
William Kurkianea869482019-04-09 15:16:11 -0400989 length := binary.BigEndian.Uint32(header[:4])
990 payload := make([]byte, length-4)
khenaidoo106c61a2021-08-11 18:05:46 -0400991 n, err := b.readFull(payload)
William Kurkianea869482019-04-09 15:16:11 -0400992 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400993 b.addRequestInFlightMetrics(-1)
William Kurkianea869482019-04-09 15:16:11 -0400994 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
995 return err
996 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530997
William Kurkianea869482019-04-09 15:16:11 -0400998 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
999 res := &SaslHandshakeResponse{}
Abhilash S.L3b494632019-07-16 15:51:09 +05301000
William Kurkianea869482019-04-09 15:16:11 -04001001 err = versionedDecode(payload, res, 0)
1002 if err != nil {
1003 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
1004 return err
1005 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301006
William Kurkianea869482019-04-09 15:16:11 -04001007 if res.Err != ErrNoError {
1008 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
1009 return res.Err
1010 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301011
1012 Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
William Kurkianea869482019-04-09 15:16:11 -04001013 return nil
1014}
1015
Abhilash S.L3b494632019-07-16 15:51:09 +05301016// Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
1017// Kafka 1.x.x onward added a SaslAuthenticate request/response message which
1018// wraps the SASL flow in the Kafka protocol, which allows for returning
1019// meaningful errors on authentication failure.
William Kurkianea869482019-04-09 15:16:11 -04001020//
1021// In SASL Plain, Kafka expects the auth header to be in the following format
1022// Message format (from https://tools.ietf.org/html/rfc4616):
1023//
1024// message = [authzid] UTF8NUL authcid UTF8NUL passwd
1025// authcid = 1*SAFE ; MUST accept up to 255 octets
1026// authzid = 1*SAFE ; MUST accept up to 255 octets
1027// passwd = 1*SAFE ; MUST accept up to 255 octets
1028// UTF8NUL = %x00 ; UTF-8 encoded NUL character
1029//
1030// SAFE = UTF1 / UTF2 / UTF3 / UTF4
1031// ;; any UTF-8 encoded Unicode character except NUL
1032//
Abhilash S.L3b494632019-07-16 15:51:09 +05301033// With SASL v0 handshake and auth then:
William Kurkianea869482019-04-09 15:16:11 -04001034// When credentials are valid, Kafka returns a 4 byte array of null characters.
Abhilash S.L3b494632019-07-16 15:51:09 +05301035// When credentials are invalid, Kafka closes the connection.
1036//
1037// With SASL v1 handshake and auth then:
1038// When credentials are invalid, Kafka replies with a SaslAuthenticate response
1039// containing an error code and message detailing the authentication failure.
William Kurkianea869482019-04-09 15:16:11 -04001040func (b *Broker) sendAndReceiveSASLPlainAuth() error {
khenaidoo106c61a2021-08-11 18:05:46 -04001041 // default to V0 to allow for backward compatibility when SASL is enabled
Abhilash S.L3b494632019-07-16 15:51:09 +05301042 // but not the handshake
William Kurkianea869482019-04-09 15:16:11 -04001043 if b.conf.Net.SASL.Handshake {
Abhilash S.L3b494632019-07-16 15:51:09 +05301044 handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
William Kurkianea869482019-04-09 15:16:11 -04001045 if handshakeErr != nil {
1046 Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
1047 return handshakeErr
1048 }
1049 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301050
1051 if b.conf.Net.SASL.Version == SASLHandshakeV1 {
1052 return b.sendAndReceiveV1SASLPlainAuth()
1053 }
1054 return b.sendAndReceiveV0SASLPlainAuth()
1055}
1056
1057// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
1058func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
khenaidoo106c61a2021-08-11 18:05:46 -04001059 length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
1060 authBytes := make([]byte, length+4) // 4 byte length header + auth data
William Kurkianea869482019-04-09 15:16:11 -04001061 binary.BigEndian.PutUint32(authBytes, uint32(length))
khenaidoo106c61a2021-08-11 18:05:46 -04001062 copy(authBytes[4:], b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)
William Kurkianea869482019-04-09 15:16:11 -04001063
1064 requestTime := time.Now()
khenaidoo106c61a2021-08-11 18:05:46 -04001065 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1066 b.addRequestInFlightMetrics(1)
1067 bytesWritten, err := b.write(authBytes)
William Kurkianea869482019-04-09 15:16:11 -04001068 b.updateOutgoingCommunicationMetrics(bytesWritten)
1069 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -04001070 b.addRequestInFlightMetrics(-1)
William Kurkianea869482019-04-09 15:16:11 -04001071 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1072 return err
1073 }
1074
1075 header := make([]byte, 4)
khenaidoo106c61a2021-08-11 18:05:46 -04001076 n, err := b.readFull(header)
William Kurkianea869482019-04-09 15:16:11 -04001077 b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
1078 // If the credentials are valid, we would get a 4 byte response filled with null characters.
1079 // Otherwise, the broker closes the connection and we get an EOF
1080 if err != nil {
1081 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1082 return err
1083 }
1084
1085 Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
1086 return nil
1087}
1088
Abhilash S.L3b494632019-07-16 15:51:09 +05301089// sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
1090func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
1091 correlationID := b.correlationID
1092
1093 requestTime := time.Now()
1094
khenaidoo106c61a2021-08-11 18:05:46 -04001095 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1096 b.addRequestInFlightMetrics(1)
Abhilash S.L3b494632019-07-16 15:51:09 +05301097 bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
Abhilash S.L3b494632019-07-16 15:51:09 +05301098 b.updateOutgoingCommunicationMetrics(bytesWritten)
1099
1100 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -04001101 b.addRequestInFlightMetrics(-1)
Abhilash S.L3b494632019-07-16 15:51:09 +05301102 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1103 return err
1104 }
1105
1106 b.correlationID++
1107
David Bainbridge788e5202019-10-21 18:49:40 +00001108 bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
Abhilash S.L3b494632019-07-16 15:51:09 +05301109 b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
1110
1111 // With v1 sasl we get an error message set in the response we can return
1112 if err != nil {
1113 Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
1114 return err
1115 }
1116
1117 return nil
1118}
1119
William Kurkianea869482019-04-09 15:16:11 -04001120// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
1121// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
1122func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
William Kurkianea869482019-04-09 15:16:11 -04001123 if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
1124 return err
1125 }
1126
1127 token, err := provider.Token()
William Kurkianea869482019-04-09 15:16:11 -04001128 if err != nil {
1129 return err
1130 }
1131
David Bainbridge788e5202019-10-21 18:49:40 +00001132 message, err := buildClientFirstMessage(token)
1133 if err != nil {
1134 return err
1135 }
1136
1137 challenged, err := b.sendClientMessage(message)
1138 if err != nil {
1139 return err
1140 }
1141
1142 if challenged {
1143 // Abort the token exchange. The broker returns the failure code.
1144 _, err = b.sendClientMessage([]byte(`\x01`))
1145 }
1146
1147 return err
1148}
1149
1150// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
1151// if the broker responds with a challenge, in which case the token is
1152// rejected.
1153func (b *Broker) sendClientMessage(message []byte) (bool, error) {
William Kurkianea869482019-04-09 15:16:11 -04001154 requestTime := time.Now()
khenaidoo106c61a2021-08-11 18:05:46 -04001155 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1156 b.addRequestInFlightMetrics(1)
William Kurkianea869482019-04-09 15:16:11 -04001157 correlationID := b.correlationID
1158
David Bainbridge788e5202019-10-21 18:49:40 +00001159 bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
khenaidoo106c61a2021-08-11 18:05:46 -04001160 b.updateOutgoingCommunicationMetrics(bytesWritten)
William Kurkianea869482019-04-09 15:16:11 -04001161 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -04001162 b.addRequestInFlightMetrics(-1)
David Bainbridge788e5202019-10-21 18:49:40 +00001163 return false, err
William Kurkianea869482019-04-09 15:16:11 -04001164 }
1165
William Kurkianea869482019-04-09 15:16:11 -04001166 b.correlationID++
1167
David Bainbridge788e5202019-10-21 18:49:40 +00001168 res := &SaslAuthenticateResponse{}
1169 bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
William Kurkianea869482019-04-09 15:16:11 -04001170
1171 requestLatency := time.Since(requestTime)
1172 b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
1173
David Bainbridge788e5202019-10-21 18:49:40 +00001174 isChallenge := len(res.SaslAuthBytes) > 0
1175
1176 if isChallenge && err != nil {
1177 Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
1178 }
1179
1180 return isChallenge, err
William Kurkianea869482019-04-09 15:16:11 -04001181}
1182
Abhilash S.L3b494632019-07-16 15:51:09 +05301183func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
1184 if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
1185 return err
1186 }
1187
1188 scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
1189 if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
1190 return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
1191 }
1192
1193 msg, err := scramClient.Step("")
1194 if err != nil {
1195 return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
Abhilash S.L3b494632019-07-16 15:51:09 +05301196 }
1197
1198 for !scramClient.Done() {
1199 requestTime := time.Now()
khenaidoo106c61a2021-08-11 18:05:46 -04001200 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1201 b.addRequestInFlightMetrics(1)
Abhilash S.L3b494632019-07-16 15:51:09 +05301202 correlationID := b.correlationID
1203 bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
khenaidoo106c61a2021-08-11 18:05:46 -04001204 b.updateOutgoingCommunicationMetrics(bytesWritten)
Abhilash S.L3b494632019-07-16 15:51:09 +05301205 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -04001206 b.addRequestInFlightMetrics(-1)
Abhilash S.L3b494632019-07-16 15:51:09 +05301207 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1208 return err
1209 }
1210
Abhilash S.L3b494632019-07-16 15:51:09 +05301211 b.correlationID++
1212 challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
1213 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -04001214 b.addRequestInFlightMetrics(-1)
Abhilash S.L3b494632019-07-16 15:51:09 +05301215 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1216 return err
1217 }
1218
1219 b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
1220 msg, err = scramClient.Step(string(challenge))
1221 if err != nil {
1222 Logger.Println("SASL authentication failed", err)
1223 return err
1224 }
1225 }
1226
1227 Logger.Println("SASL authentication succeeded")
1228 return nil
1229}
1230
1231func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
1232 rb := &SaslAuthenticateRequest{msg}
1233 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1234 buf, err := encode(req, b.conf.MetricRegistry)
1235 if err != nil {
1236 return 0, err
1237 }
1238
khenaidoo106c61a2021-08-11 18:05:46 -04001239 return b.write(buf)
Abhilash S.L3b494632019-07-16 15:51:09 +05301240}
1241
1242func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
1243 buf := make([]byte, responseLengthSize+correlationIDSize)
khenaidoo106c61a2021-08-11 18:05:46 -04001244 _, err := b.readFull(buf)
Abhilash S.L3b494632019-07-16 15:51:09 +05301245 if err != nil {
1246 return nil, err
1247 }
1248
1249 header := responseHeader{}
khenaidoo106c61a2021-08-11 18:05:46 -04001250 err = versionedDecode(buf, &header, 0)
Abhilash S.L3b494632019-07-16 15:51:09 +05301251 if err != nil {
1252 return nil, err
1253 }
1254
1255 if header.correlationID != correlationID {
1256 return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1257 }
1258
1259 buf = make([]byte, header.length-correlationIDSize)
khenaidoo106c61a2021-08-11 18:05:46 -04001260 _, err = b.readFull(buf)
Abhilash S.L3b494632019-07-16 15:51:09 +05301261 if err != nil {
1262 return nil, err
1263 }
1264
1265 res := &SaslAuthenticateResponse{}
1266 if err := versionedDecode(buf, res, 0); err != nil {
1267 return nil, err
1268 }
1269 if res.Err != ErrNoError {
1270 return nil, res.Err
1271 }
1272 return res.SaslAuthBytes, nil
1273}
1274
William Kurkianea869482019-04-09 15:16:11 -04001275// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
1276// https://tools.ietf.org/html/rfc7628
David Bainbridge788e5202019-10-21 18:49:40 +00001277func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
William Kurkianea869482019-04-09 15:16:11 -04001278 var ext string
1279
1280 if token.Extensions != nil && len(token.Extensions) > 0 {
1281 if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
Abhilash S.L3b494632019-07-16 15:51:09 +05301282 return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
William Kurkianea869482019-04-09 15:16:11 -04001283 }
1284 ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
1285 }
1286
1287 resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
1288
1289 return resp, nil
1290}
1291
1292// mapToString returns a list of key-value pairs ordered by key.
1293// keyValSep separates the key from the value. elemSep separates each pair.
1294func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
William Kurkianea869482019-04-09 15:16:11 -04001295 buf := make([]string, 0, len(extensions))
1296
1297 for k, v := range extensions {
1298 buf = append(buf, k+keyValSep+v)
1299 }
1300
1301 sort.Strings(buf)
1302
1303 return strings.Join(buf, elemSep)
1304}
1305
Abhilash S.L3b494632019-07-16 15:51:09 +05301306func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
khenaidoo106c61a2021-08-11 18:05:46 -04001307 authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
Abhilash S.L3b494632019-07-16 15:51:09 +05301308 rb := &SaslAuthenticateRequest{authBytes}
1309 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1310 buf, err := encode(req, b.conf.MetricRegistry)
1311 if err != nil {
1312 return 0, err
1313 }
1314
khenaidoo106c61a2021-08-11 18:05:46 -04001315 return b.write(buf)
Abhilash S.L3b494632019-07-16 15:51:09 +05301316}
1317
David Bainbridge788e5202019-10-21 18:49:40 +00001318func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
William Kurkianea869482019-04-09 15:16:11 -04001319 rb := &SaslAuthenticateRequest{initialResp}
1320
1321 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1322
1323 buf, err := encode(req, b.conf.MetricRegistry)
William Kurkianea869482019-04-09 15:16:11 -04001324 if err != nil {
1325 return 0, err
1326 }
1327
khenaidoo106c61a2021-08-11 18:05:46 -04001328 return b.write(buf)
William Kurkianea869482019-04-09 15:16:11 -04001329}
1330
David Bainbridge788e5202019-10-21 18:49:40 +00001331func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
Abhilash S.L3b494632019-07-16 15:51:09 +05301332 buf := make([]byte, responseLengthSize+correlationIDSize)
khenaidoo106c61a2021-08-11 18:05:46 -04001333 bytesRead, err := b.readFull(buf)
William Kurkianea869482019-04-09 15:16:11 -04001334 if err != nil {
1335 return bytesRead, err
1336 }
1337
1338 header := responseHeader{}
khenaidoo106c61a2021-08-11 18:05:46 -04001339 err = versionedDecode(buf, &header, 0)
William Kurkianea869482019-04-09 15:16:11 -04001340 if err != nil {
1341 return bytesRead, err
1342 }
1343
1344 if header.correlationID != correlationID {
1345 return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1346 }
1347
Abhilash S.L3b494632019-07-16 15:51:09 +05301348 buf = make([]byte, header.length-correlationIDSize)
khenaidoo106c61a2021-08-11 18:05:46 -04001349 c, err := b.readFull(buf)
William Kurkianea869482019-04-09 15:16:11 -04001350 bytesRead += c
William Kurkianea869482019-04-09 15:16:11 -04001351 if err != nil {
1352 return bytesRead, err
1353 }
1354
William Kurkianea869482019-04-09 15:16:11 -04001355 if err := versionedDecode(buf, res, 0); err != nil {
1356 return bytesRead, err
1357 }
1358
William Kurkianea869482019-04-09 15:16:11 -04001359 if res.Err != ErrNoError {
1360 return bytesRead, res.Err
1361 }
1362
William Kurkianea869482019-04-09 15:16:11 -04001363 return bytesRead, nil
1364}
1365
1366func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
khenaidoo106c61a2021-08-11 18:05:46 -04001367 b.updateRequestLatencyAndInFlightMetrics(requestLatency)
William Kurkianea869482019-04-09 15:16:11 -04001368 b.responseRate.Mark(1)
Abhilash S.L3b494632019-07-16 15:51:09 +05301369
William Kurkianea869482019-04-09 15:16:11 -04001370 if b.brokerResponseRate != nil {
1371 b.brokerResponseRate.Mark(1)
1372 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301373
William Kurkianea869482019-04-09 15:16:11 -04001374 responseSize := int64(bytes)
1375 b.incomingByteRate.Mark(responseSize)
1376 if b.brokerIncomingByteRate != nil {
1377 b.brokerIncomingByteRate.Mark(responseSize)
1378 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301379
William Kurkianea869482019-04-09 15:16:11 -04001380 b.responseSize.Update(responseSize)
1381 if b.brokerResponseSize != nil {
1382 b.brokerResponseSize.Update(responseSize)
1383 }
1384}
1385
khenaidoo106c61a2021-08-11 18:05:46 -04001386func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
William Kurkianea869482019-04-09 15:16:11 -04001387 requestLatencyInMs := int64(requestLatency / time.Millisecond)
1388 b.requestLatency.Update(requestLatencyInMs)
Abhilash S.L3b494632019-07-16 15:51:09 +05301389
William Kurkianea869482019-04-09 15:16:11 -04001390 if b.brokerRequestLatency != nil {
1391 b.brokerRequestLatency.Update(requestLatencyInMs)
1392 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301393
khenaidoo106c61a2021-08-11 18:05:46 -04001394 b.addRequestInFlightMetrics(-1)
1395}
1396
1397func (b *Broker) addRequestInFlightMetrics(i int64) {
1398 b.requestsInFlight.Inc(i)
1399 if b.brokerRequestsInFlight != nil {
1400 b.brokerRequestsInFlight.Inc(i)
1401 }
William Kurkianea869482019-04-09 15:16:11 -04001402}
1403
1404func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
1405 b.requestRate.Mark(1)
1406 if b.brokerRequestRate != nil {
1407 b.brokerRequestRate.Mark(1)
1408 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301409
William Kurkianea869482019-04-09 15:16:11 -04001410 requestSize := int64(bytes)
1411 b.outgoingByteRate.Mark(requestSize)
1412 if b.brokerOutgoingByteRate != nil {
1413 b.brokerOutgoingByteRate.Mark(requestSize)
1414 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301415
William Kurkianea869482019-04-09 15:16:11 -04001416 b.requestSize.Update(requestSize)
1417 if b.brokerRequestSize != nil {
1418 b.brokerRequestSize.Update(requestSize)
1419 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301420}
1421
1422func (b *Broker) registerMetrics() {
1423 b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
1424 b.brokerRequestRate = b.registerMeter("request-rate")
1425 b.brokerRequestSize = b.registerHistogram("request-size")
1426 b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
1427 b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
1428 b.brokerResponseRate = b.registerMeter("response-rate")
1429 b.brokerResponseSize = b.registerHistogram("response-size")
khenaidoo106c61a2021-08-11 18:05:46 -04001430 b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
Abhilash S.L3b494632019-07-16 15:51:09 +05301431}
1432
1433func (b *Broker) unregisterMetrics() {
1434 for _, name := range b.registeredMetrics {
1435 b.conf.MetricRegistry.Unregister(name)
1436 }
khenaidoo106c61a2021-08-11 18:05:46 -04001437 b.registeredMetrics = nil
Abhilash S.L3b494632019-07-16 15:51:09 +05301438}
1439
1440func (b *Broker) registerMeter(name string) metrics.Meter {
1441 nameForBroker := getMetricNameForBroker(name, b)
1442 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1443 return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
1444}
1445
1446func (b *Broker) registerHistogram(name string) metrics.Histogram {
1447 nameForBroker := getMetricNameForBroker(name, b)
1448 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1449 return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
William Kurkianea869482019-04-09 15:16:11 -04001450}
khenaidoo106c61a2021-08-11 18:05:46 -04001451
1452func (b *Broker) registerCounter(name string) metrics.Counter {
1453 nameForBroker := getMetricNameForBroker(name, b)
1454 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1455 return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
1456}
1457
1458func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config {
1459 if cfg == nil {
1460 cfg = &tls.Config{
1461 MinVersion: tls.VersionTLS12,
1462 }
1463 }
1464 if cfg.ServerName != "" {
1465 return cfg
1466 }
1467
1468 c := cfg.Clone()
1469 sn, _, err := net.SplitHostPort(addr)
1470 if err != nil {
1471 Logger.Println(fmt.Errorf("failed to get ServerName from addr %w", err))
1472 }
1473 c.ServerName = sn
1474 return c
1475}