blob: dd01e4ef1fb487a8eb6977ead1e49d3ec7be7d2a [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001package sarama
2
3import (
4 "crypto/tls"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "sort"
10 "strconv"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15
khenaidoo7d3c5582021-08-11 18:09:44 -040016 "github.com/rcrowley/go-metrics"
Holger Hildebrandtfa074992020-03-27 15:42:06 +000017)
18
19// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
20type Broker struct {
21 conf *Config
22 rack *string
23
24 id int32
25 addr string
26 correlationID int32
27 conn net.Conn
28 connErr error
29 lock sync.Mutex
30 opened int32
31 responses chan responsePromise
32 done chan bool
33
34 registeredMetrics []string
35
36 incomingByteRate metrics.Meter
37 requestRate metrics.Meter
38 requestSize metrics.Histogram
39 requestLatency metrics.Histogram
40 outgoingByteRate metrics.Meter
41 responseRate metrics.Meter
42 responseSize metrics.Histogram
khenaidoo7d3c5582021-08-11 18:09:44 -040043 requestsInFlight metrics.Counter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000044 brokerIncomingByteRate metrics.Meter
45 brokerRequestRate metrics.Meter
46 brokerRequestSize metrics.Histogram
47 brokerRequestLatency metrics.Histogram
48 brokerOutgoingByteRate metrics.Meter
49 brokerResponseRate metrics.Meter
50 brokerResponseSize metrics.Histogram
khenaidoo7d3c5582021-08-11 18:09:44 -040051 brokerRequestsInFlight metrics.Counter
Holger Hildebrandtfa074992020-03-27 15:42:06 +000052
53 kerberosAuthenticator GSSAPIKerberosAuth
54}
55
56// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
57type SASLMechanism string
58
59const (
60 // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
61 SASLTypeOAuth = "OAUTHBEARER"
62 // SASLTypePlaintext represents the SASL/PLAIN mechanism
63 SASLTypePlaintext = "PLAIN"
64 // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
65 SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
66 // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
67 SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
68 SASLTypeGSSAPI = "GSSAPI"
69 // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
70 // server negotiate SASL auth using opaque packets.
71 SASLHandshakeV0 = int16(0)
72 // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
73 // server negotiate SASL by wrapping tokens with Kafka protocol headers.
74 SASLHandshakeV1 = int16(1)
75 // SASLExtKeyAuth is the reserved extension key name sent as part of the
khenaidoo7d3c5582021-08-11 18:09:44 -040076 // SASL/OAUTHBEARER initial client response
Holger Hildebrandtfa074992020-03-27 15:42:06 +000077 SASLExtKeyAuth = "auth"
78)
79
80// AccessToken contains an access token used to authenticate a
81// SASL/OAUTHBEARER client along with associated metadata.
82type AccessToken struct {
83 // Token is the access token payload.
84 Token string
85 // Extensions is a optional map of arbitrary key-value pairs that can be
86 // sent with the SASL/OAUTHBEARER initial client response. These values are
87 // ignored by the SASL server if they are unexpected. This feature is only
88 // supported by Kafka >= 2.1.0.
89 Extensions map[string]string
90}
91
92// AccessTokenProvider is the interface that encapsulates how implementors
93// can generate access tokens for Kafka broker authentication.
94type AccessTokenProvider interface {
95 // Token returns an access token. The implementation should ensure token
96 // reuse so that multiple calls at connect time do not create multiple
97 // tokens. The implementation should also periodically refresh the token in
98 // order to guarantee that each call returns an unexpired token. This
99 // method should not block indefinitely--a timeout error should be returned
100 // after a short period of inactivity so that the broker connection logic
101 // can log debugging information and retry.
102 Token() (*AccessToken, error)
103}
104
105// SCRAMClient is a an interface to a SCRAM
106// client implementation.
107type SCRAMClient interface {
108 // Begin prepares the client for the SCRAM exchange
109 // with the server with a user name and a password
110 Begin(userName, password, authzID string) error
111 // Step steps client through the SCRAM exchange. It is
112 // called repeatedly until it errors or `Done` returns true.
113 Step(challenge string) (response string, err error)
114 // Done should return true when the SCRAM conversation
115 // is over.
116 Done() bool
117}
118
119type responsePromise struct {
120 requestTime time.Time
121 correlationID int32
khenaidoo7d3c5582021-08-11 18:09:44 -0400122 headerVersion int16
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000123 packets chan []byte
124 errors chan error
125}
126
127// NewBroker creates and returns a Broker targeting the given host:port address.
128// This does not attempt to actually connect, you have to call Open() for that.
129func NewBroker(addr string) *Broker {
130 return &Broker{id: -1, addr: addr}
131}
132
133// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
134// waiting for the connection to complete. This means that any subsequent operations on the broker will
135// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
136// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
137// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
138func (b *Broker) Open(conf *Config) error {
139 if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
140 return ErrAlreadyConnected
141 }
142
143 if conf == nil {
144 conf = NewConfig()
145 }
146
147 err := conf.Validate()
148 if err != nil {
149 return err
150 }
151
152 b.lock.Lock()
153
154 go withRecover(func() {
155 defer b.lock.Unlock()
156
khenaidoo7d3c5582021-08-11 18:09:44 -0400157 dialer := conf.getDialer()
158 b.conn, b.connErr = dialer.Dial("tcp", b.addr)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000159 if b.connErr != nil {
160 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
161 b.conn = nil
162 atomic.StoreInt32(&b.opened, 0)
163 return
164 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400165 if conf.Net.TLS.Enable {
166 b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
167 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000168
khenaidoo7d3c5582021-08-11 18:09:44 -0400169 b.conn = newBufConn(b.conn)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000170 b.conf = conf
171
172 // Create or reuse the global metrics shared between brokers
173 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
174 b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
175 b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
176 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
177 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
178 b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
179 b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
khenaidoo7d3c5582021-08-11 18:09:44 -0400180 b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000181 // Do not gather metrics for seeded broker (only used during bootstrap) because they share
182 // the same id (-1) and are already exposed through the global metrics above
183 if b.id >= 0 {
184 b.registerMetrics()
185 }
186
187 if conf.Net.SASL.Enable {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000188 b.connErr = b.authenticateViaSASL()
189
190 if b.connErr != nil {
191 err = b.conn.Close()
192 if err == nil {
193 Logger.Printf("Closed connection to broker %s\n", b.addr)
194 } else {
195 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
196 }
197 b.conn = nil
198 atomic.StoreInt32(&b.opened, 0)
199 return
200 }
201 }
202
203 b.done = make(chan bool)
204 b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
205
206 if b.id >= 0 {
207 Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
208 } else {
209 Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
210 }
211 go withRecover(b.responseReceiver)
212 })
213
214 return nil
215}
216
217// Connected returns true if the broker is connected and false otherwise. If the broker is not
218// connected but it had tried to connect, the error from that connection attempt is also returned.
219func (b *Broker) Connected() (bool, error) {
220 b.lock.Lock()
221 defer b.lock.Unlock()
222
223 return b.conn != nil, b.connErr
224}
225
khenaidoo7d3c5582021-08-11 18:09:44 -0400226// Close closes the broker resources
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000227func (b *Broker) Close() error {
228 b.lock.Lock()
229 defer b.lock.Unlock()
230
231 if b.conn == nil {
232 return ErrNotConnected
233 }
234
235 close(b.responses)
236 <-b.done
237
238 err := b.conn.Close()
239
240 b.conn = nil
241 b.connErr = nil
242 b.done = nil
243 b.responses = nil
244
245 b.unregisterMetrics()
246
247 if err == nil {
248 Logger.Printf("Closed connection to broker %s\n", b.addr)
249 } else {
250 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
251 }
252
253 atomic.StoreInt32(&b.opened, 0)
254
255 return err
256}
257
258// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
259func (b *Broker) ID() int32 {
260 return b.id
261}
262
263// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
264func (b *Broker) Addr() string {
265 return b.addr
266}
267
268// Rack returns the broker's rack as retrieved from Kafka's metadata or the
269// empty string if it is not known. The returned value corresponds to the
270// broker's broker.rack configuration setting. Requires protocol version to be
271// at least v0.10.0.0.
272func (b *Broker) Rack() string {
273 if b.rack == nil {
274 return ""
275 }
276 return *b.rack
277}
278
khenaidoo7d3c5582021-08-11 18:09:44 -0400279// GetMetadata send a metadata request and returns a metadata response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000280func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
281 response := new(MetadataResponse)
282
283 err := b.sendAndReceive(request, response)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000284 if err != nil {
285 return nil, err
286 }
287
288 return response, nil
289}
290
khenaidoo7d3c5582021-08-11 18:09:44 -0400291// GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000292func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
293 response := new(ConsumerMetadataResponse)
294
295 err := b.sendAndReceive(request, response)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000296 if err != nil {
297 return nil, err
298 }
299
300 return response, nil
301}
302
khenaidoo7d3c5582021-08-11 18:09:44 -0400303// FindCoordinator sends a find coordinate request and returns a response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000304func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
305 response := new(FindCoordinatorResponse)
306
307 err := b.sendAndReceive(request, response)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000308 if err != nil {
309 return nil, err
310 }
311
312 return response, nil
313}
314
khenaidoo7d3c5582021-08-11 18:09:44 -0400315// GetAvailableOffsets return an offset response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000316func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
317 response := new(OffsetResponse)
318
319 err := b.sendAndReceive(request, response)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000320 if err != nil {
321 return nil, err
322 }
323
324 return response, nil
325}
326
khenaidoo7d3c5582021-08-11 18:09:44 -0400327// Produce returns a produce response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000328func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
329 var (
330 response *ProduceResponse
331 err error
332 )
333
334 if request.RequiredAcks == NoResponse {
335 err = b.sendAndReceive(request, nil)
336 } else {
337 response = new(ProduceResponse)
338 err = b.sendAndReceive(request, response)
339 }
340
341 if err != nil {
342 return nil, err
343 }
344
345 return response, nil
346}
347
khenaidoo7d3c5582021-08-11 18:09:44 -0400348// Fetch returns a FetchResponse or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000349func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
350 response := new(FetchResponse)
351
352 err := b.sendAndReceive(request, response)
353 if err != nil {
354 return nil, err
355 }
356
357 return response, nil
358}
359
khenaidoo7d3c5582021-08-11 18:09:44 -0400360// CommitOffset return an Offset commit response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000361func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
362 response := new(OffsetCommitResponse)
363
364 err := b.sendAndReceive(request, response)
365 if err != nil {
366 return nil, err
367 }
368
369 return response, nil
370}
371
khenaidoo7d3c5582021-08-11 18:09:44 -0400372// FetchOffset returns an offset fetch response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000373func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
374 response := new(OffsetFetchResponse)
khenaidoo7d3c5582021-08-11 18:09:44 -0400375 response.Version = request.Version // needed to handle the two header versions
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000376
377 err := b.sendAndReceive(request, response)
378 if err != nil {
379 return nil, err
380 }
381
382 return response, nil
383}
384
khenaidoo7d3c5582021-08-11 18:09:44 -0400385// JoinGroup returns a join group response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000386func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
387 response := new(JoinGroupResponse)
388
389 err := b.sendAndReceive(request, response)
390 if err != nil {
391 return nil, err
392 }
393
394 return response, nil
395}
396
khenaidoo7d3c5582021-08-11 18:09:44 -0400397// SyncGroup returns a sync group response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000398func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
399 response := new(SyncGroupResponse)
400
401 err := b.sendAndReceive(request, response)
402 if err != nil {
403 return nil, err
404 }
405
406 return response, nil
407}
408
khenaidoo7d3c5582021-08-11 18:09:44 -0400409// LeaveGroup return a leave group response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000410func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
411 response := new(LeaveGroupResponse)
412
413 err := b.sendAndReceive(request, response)
414 if err != nil {
415 return nil, err
416 }
417
418 return response, nil
419}
420
khenaidoo7d3c5582021-08-11 18:09:44 -0400421// Heartbeat returns a heartbeat response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000422func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
423 response := new(HeartbeatResponse)
424
425 err := b.sendAndReceive(request, response)
426 if err != nil {
427 return nil, err
428 }
429
430 return response, nil
431}
432
khenaidoo7d3c5582021-08-11 18:09:44 -0400433// ListGroups return a list group response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000434func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
435 response := new(ListGroupsResponse)
436
437 err := b.sendAndReceive(request, response)
438 if err != nil {
439 return nil, err
440 }
441
442 return response, nil
443}
444
khenaidoo7d3c5582021-08-11 18:09:44 -0400445// DescribeGroups return describe group response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000446func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
447 response := new(DescribeGroupsResponse)
448
449 err := b.sendAndReceive(request, response)
450 if err != nil {
451 return nil, err
452 }
453
454 return response, nil
455}
456
khenaidoo7d3c5582021-08-11 18:09:44 -0400457// ApiVersions return api version response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000458func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
459 response := new(ApiVersionsResponse)
460
461 err := b.sendAndReceive(request, response)
462 if err != nil {
463 return nil, err
464 }
465
466 return response, nil
467}
468
khenaidoo7d3c5582021-08-11 18:09:44 -0400469// CreateTopics send a create topic request and returns create topic response
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000470func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
471 response := new(CreateTopicsResponse)
472
473 err := b.sendAndReceive(request, response)
474 if err != nil {
475 return nil, err
476 }
477
478 return response, nil
479}
480
khenaidoo7d3c5582021-08-11 18:09:44 -0400481// DeleteTopics sends a delete topic request and returns delete topic response
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000482func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
483 response := new(DeleteTopicsResponse)
484
485 err := b.sendAndReceive(request, response)
486 if err != nil {
487 return nil, err
488 }
489
490 return response, nil
491}
492
khenaidoo7d3c5582021-08-11 18:09:44 -0400493// CreatePartitions sends a create partition request and returns create
494// partitions response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000495func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
496 response := new(CreatePartitionsResponse)
497
498 err := b.sendAndReceive(request, response)
499 if err != nil {
500 return nil, err
501 }
502
503 return response, nil
504}
505
khenaidoo7d3c5582021-08-11 18:09:44 -0400506// AlterPartitionReassignments sends a alter partition reassignments request and
507// returns alter partition reassignments response
508func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) {
509 response := new(AlterPartitionReassignmentsResponse)
510
511 err := b.sendAndReceive(request, response)
512 if err != nil {
513 return nil, err
514 }
515
516 return response, nil
517}
518
519// ListPartitionReassignments sends a list partition reassignments request and
520// returns list partition reassignments response
521func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
522 response := new(ListPartitionReassignmentsResponse)
523
524 err := b.sendAndReceive(request, response)
525 if err != nil {
526 return nil, err
527 }
528
529 return response, nil
530}
531
532// DeleteRecords send a request to delete records and return delete record
533// response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000534func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
535 response := new(DeleteRecordsResponse)
536
537 err := b.sendAndReceive(request, response)
538 if err != nil {
539 return nil, err
540 }
541
542 return response, nil
543}
544
khenaidoo7d3c5582021-08-11 18:09:44 -0400545// DescribeAcls sends a describe acl request and returns a response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000546func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
547 response := new(DescribeAclsResponse)
548
549 err := b.sendAndReceive(request, response)
550 if err != nil {
551 return nil, err
552 }
553
554 return response, nil
555}
556
khenaidoo7d3c5582021-08-11 18:09:44 -0400557// CreateAcls sends a create acl request and returns a response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000558func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
559 response := new(CreateAclsResponse)
560
561 err := b.sendAndReceive(request, response)
562 if err != nil {
563 return nil, err
564 }
565
566 return response, nil
567}
568
khenaidoo7d3c5582021-08-11 18:09:44 -0400569// DeleteAcls sends a delete acl request and returns a response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000570func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
571 response := new(DeleteAclsResponse)
572
573 err := b.sendAndReceive(request, response)
574 if err != nil {
575 return nil, err
576 }
577
578 return response, nil
579}
580
khenaidoo7d3c5582021-08-11 18:09:44 -0400581// InitProducerID sends an init producer request and returns a response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000582func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
583 response := new(InitProducerIDResponse)
584
585 err := b.sendAndReceive(request, response)
586 if err != nil {
587 return nil, err
588 }
589
590 return response, nil
591}
592
khenaidoo7d3c5582021-08-11 18:09:44 -0400593// AddPartitionsToTxn send a request to add partition to txn and returns
594// a response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000595func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
596 response := new(AddPartitionsToTxnResponse)
597
598 err := b.sendAndReceive(request, response)
599 if err != nil {
600 return nil, err
601 }
602
603 return response, nil
604}
605
khenaidoo7d3c5582021-08-11 18:09:44 -0400606// AddOffsetsToTxn sends a request to add offsets to txn and returns a response
607// or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000608func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
609 response := new(AddOffsetsToTxnResponse)
610
611 err := b.sendAndReceive(request, response)
612 if err != nil {
613 return nil, err
614 }
615
616 return response, nil
617}
618
khenaidoo7d3c5582021-08-11 18:09:44 -0400619// EndTxn sends a request to end txn and returns a response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000620func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
621 response := new(EndTxnResponse)
622
623 err := b.sendAndReceive(request, response)
624 if err != nil {
625 return nil, err
626 }
627
628 return response, nil
629}
630
khenaidoo7d3c5582021-08-11 18:09:44 -0400631// TxnOffsetCommit sends a request to commit transaction offsets and returns
632// a response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000633func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
634 response := new(TxnOffsetCommitResponse)
635
636 err := b.sendAndReceive(request, response)
637 if err != nil {
638 return nil, err
639 }
640
641 return response, nil
642}
643
khenaidoo7d3c5582021-08-11 18:09:44 -0400644// DescribeConfigs sends a request to describe config and returns a response or
645// error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000646func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
647 response := new(DescribeConfigsResponse)
648
649 err := b.sendAndReceive(request, response)
650 if err != nil {
651 return nil, err
652 }
653
654 return response, nil
655}
656
khenaidoo7d3c5582021-08-11 18:09:44 -0400657// AlterConfigs sends a request to alter config and return a response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000658func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
659 response := new(AlterConfigsResponse)
660
661 err := b.sendAndReceive(request, response)
662 if err != nil {
663 return nil, err
664 }
665
666 return response, nil
667}
668
khenaidoo7d3c5582021-08-11 18:09:44 -0400669// IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
670func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
671 response := new(IncrementalAlterConfigsResponse)
672
673 err := b.sendAndReceive(request, response)
674 if err != nil {
675 return nil, err
676 }
677
678 return response, nil
679}
680
681// DeleteGroups sends a request to delete groups and returns a response or error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000682func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
683 response := new(DeleteGroupsResponse)
684
685 if err := b.sendAndReceive(request, response); err != nil {
686 return nil, err
687 }
688
689 return response, nil
690}
691
khenaidoo7d3c5582021-08-11 18:09:44 -0400692// DescribeLogDirs sends a request to get the broker's log dir paths and sizes
693func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
694 response := new(DescribeLogDirsResponse)
695
696 err := b.sendAndReceive(request, response)
697 if err != nil {
698 return nil, err
699 }
700
701 return response, nil
702}
703
704// DescribeUserScramCredentials sends a request to get SCRAM users
705func (b *Broker) DescribeUserScramCredentials(req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
706 res := new(DescribeUserScramCredentialsResponse)
707
708 err := b.sendAndReceive(req, res)
709 if err != nil {
710 return nil, err
711 }
712
713 return res, err
714}
715
716func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
717 res := new(AlterUserScramCredentialsResponse)
718
719 err := b.sendAndReceive(req, res)
720 if err != nil {
721 return nil, err
722 }
723
724 return res, nil
725}
726
727// readFull ensures the conn ReadDeadline has been setup before making a
728// call to io.ReadFull
729func (b *Broker) readFull(buf []byte) (n int, err error) {
730 if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
731 return 0, err
732 }
733
734 return io.ReadFull(b.conn, buf)
735}
736
737// write ensures the conn WriteDeadline has been setup before making a
738// call to conn.Write
739func (b *Broker) write(buf []byte) (n int, err error) {
740 if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
741 return 0, err
742 }
743
744 return b.conn.Write(buf)
745}
746
747func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000748 b.lock.Lock()
749 defer b.lock.Unlock()
750
751 if b.conn == nil {
752 if b.connErr != nil {
753 return nil, b.connErr
754 }
755 return nil, ErrNotConnected
756 }
757
758 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
759 return nil, ErrUnsupportedVersion
760 }
761
762 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
763 buf, err := encode(req, b.conf.MetricRegistry)
764 if err != nil {
765 return nil, err
766 }
767
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000768 requestTime := time.Now()
khenaidoo7d3c5582021-08-11 18:09:44 -0400769 // Will be decremented in responseReceiver (except error or request with NoResponse)
770 b.addRequestInFlightMetrics(1)
771 bytes, err := b.write(buf)
772 b.updateOutgoingCommunicationMetrics(bytes)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000773 if err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400774 b.addRequestInFlightMetrics(-1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000775 return nil, err
776 }
777 b.correlationID++
778
779 if !promiseResponse {
780 // Record request latency without the response
khenaidoo7d3c5582021-08-11 18:09:44 -0400781 b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000782 return nil, nil
783 }
784
khenaidoo7d3c5582021-08-11 18:09:44 -0400785 promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000786 b.responses <- promise
787
788 return &promise, nil
789}
790
khenaidoo7d3c5582021-08-11 18:09:44 -0400791func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
792 responseHeaderVersion := int16(-1)
793 if res != nil {
794 responseHeaderVersion = res.headerVersion()
795 }
796
797 promise, err := b.send(req, res != nil, responseHeaderVersion)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000798 if err != nil {
799 return err
800 }
801
802 if promise == nil {
803 return nil
804 }
805
806 select {
807 case buf := <-promise.packets:
808 return versionedDecode(buf, res, req.version())
809 case err = <-promise.errors:
810 return err
811 }
812}
813
814func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
815 b.id, err = pd.getInt32()
816 if err != nil {
817 return err
818 }
819
820 host, err := pd.getString()
821 if err != nil {
822 return err
823 }
824
825 port, err := pd.getInt32()
826 if err != nil {
827 return err
828 }
829
830 if version >= 1 {
831 b.rack, err = pd.getNullableString()
832 if err != nil {
833 return err
834 }
835 }
836
837 b.addr = net.JoinHostPort(host, fmt.Sprint(port))
838 if _, _, err := net.SplitHostPort(b.addr); err != nil {
839 return err
840 }
841
842 return nil
843}
844
845func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
846 host, portstr, err := net.SplitHostPort(b.addr)
847 if err != nil {
848 return err
849 }
850
khenaidoo7d3c5582021-08-11 18:09:44 -0400851 port, err := strconv.ParseInt(portstr, 10, 32)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000852 if err != nil {
853 return err
854 }
855
856 pe.putInt32(b.id)
857
858 err = pe.putString(host)
859 if err != nil {
860 return err
861 }
862
863 pe.putInt32(int32(port))
864
865 if version >= 1 {
866 err = pe.putNullableString(b.rack)
867 if err != nil {
868 return err
869 }
870 }
871
872 return nil
873}
874
875func (b *Broker) responseReceiver() {
876 var dead error
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000877
878 for response := range b.responses {
879 if dead != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400880 // This was previously incremented in send() and
881 // we are not calling updateIncomingCommunicationMetrics()
882 b.addRequestInFlightMetrics(-1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000883 response.errors <- dead
884 continue
885 }
886
khenaidoo7d3c5582021-08-11 18:09:44 -0400887 headerLength := getHeaderLength(response.headerVersion)
888 header := make([]byte, headerLength)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000889
khenaidoo7d3c5582021-08-11 18:09:44 -0400890 bytesReadHeader, err := b.readFull(header)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000891 requestLatency := time.Since(response.requestTime)
892 if err != nil {
893 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
894 dead = err
895 response.errors <- err
896 continue
897 }
898
899 decodedHeader := responseHeader{}
khenaidoo7d3c5582021-08-11 18:09:44 -0400900 err = versionedDecode(header, &decodedHeader, response.headerVersion)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000901 if err != nil {
902 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
903 dead = err
904 response.errors <- err
905 continue
906 }
907 if decodedHeader.correlationID != response.correlationID {
908 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
909 // TODO if decoded ID < cur ID, discard until we catch up
910 // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
911 dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
912 response.errors <- dead
913 continue
914 }
915
khenaidoo7d3c5582021-08-11 18:09:44 -0400916 buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
917 bytesReadBody, err := b.readFull(buf)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000918 b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
919 if err != nil {
920 dead = err
921 response.errors <- err
922 continue
923 }
924
925 response.packets <- buf
926 }
927 close(b.done)
928}
929
khenaidoo7d3c5582021-08-11 18:09:44 -0400930func getHeaderLength(headerVersion int16) int8 {
931 if headerVersion < 1 {
932 return 8
933 } else {
934 // header contains additional tagged field length (0), we don't support actual tags yet.
935 return 9
936 }
937}
938
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000939func (b *Broker) authenticateViaSASL() error {
940 switch b.conf.Net.SASL.Mechanism {
941 case SASLTypeOAuth:
942 return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
943 case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
944 return b.sendAndReceiveSASLSCRAMv1()
945 case SASLTypeGSSAPI:
946 return b.sendAndReceiveKerberos()
947 default:
948 return b.sendAndReceiveSASLPlainAuth()
949 }
950}
951
952func (b *Broker) sendAndReceiveKerberos() error {
953 b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
954 if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
955 b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
956 }
957 return b.kerberosAuthenticator.Authorize(b)
958}
959
960func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
961 rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
962
963 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
964 buf, err := encode(req, b.conf.MetricRegistry)
965 if err != nil {
966 return err
967 }
968
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000969 requestTime := time.Now()
khenaidoo7d3c5582021-08-11 18:09:44 -0400970 // Will be decremented in updateIncomingCommunicationMetrics (except error)
971 b.addRequestInFlightMetrics(1)
972 bytes, err := b.write(buf)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000973 b.updateOutgoingCommunicationMetrics(bytes)
974 if err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400975 b.addRequestInFlightMetrics(-1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000976 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
977 return err
978 }
979 b.correlationID++
khenaidoo7d3c5582021-08-11 18:09:44 -0400980
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000981 header := make([]byte, 8) // response header
khenaidoo7d3c5582021-08-11 18:09:44 -0400982 _, err = b.readFull(header)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000983 if err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400984 b.addRequestInFlightMetrics(-1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000985 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
986 return err
987 }
988
989 length := binary.BigEndian.Uint32(header[:4])
990 payload := make([]byte, length-4)
khenaidoo7d3c5582021-08-11 18:09:44 -0400991 n, err := b.readFull(payload)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000992 if err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -0400993 b.addRequestInFlightMetrics(-1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000994 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
995 return err
996 }
997
998 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
999 res := &SaslHandshakeResponse{}
1000
1001 err = versionedDecode(payload, res, 0)
1002 if err != nil {
1003 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
1004 return err
1005 }
1006
1007 if res.Err != ErrNoError {
1008 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
1009 return res.Err
1010 }
1011
1012 Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
1013 return nil
1014}
1015
1016// Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
1017// Kafka 1.x.x onward added a SaslAuthenticate request/response message which
1018// wraps the SASL flow in the Kafka protocol, which allows for returning
1019// meaningful errors on authentication failure.
1020//
1021// In SASL Plain, Kafka expects the auth header to be in the following format
1022// Message format (from https://tools.ietf.org/html/rfc4616):
1023//
1024// message = [authzid] UTF8NUL authcid UTF8NUL passwd
1025// authcid = 1*SAFE ; MUST accept up to 255 octets
1026// authzid = 1*SAFE ; MUST accept up to 255 octets
1027// passwd = 1*SAFE ; MUST accept up to 255 octets
1028// UTF8NUL = %x00 ; UTF-8 encoded NUL character
1029//
1030// SAFE = UTF1 / UTF2 / UTF3 / UTF4
1031// ;; any UTF-8 encoded Unicode character except NUL
1032//
1033// With SASL v0 handshake and auth then:
1034// When credentials are valid, Kafka returns a 4 byte array of null characters.
1035// When credentials are invalid, Kafka closes the connection.
1036//
1037// With SASL v1 handshake and auth then:
1038// When credentials are invalid, Kafka replies with a SaslAuthenticate response
1039// containing an error code and message detailing the authentication failure.
1040func (b *Broker) sendAndReceiveSASLPlainAuth() error {
khenaidoo7d3c5582021-08-11 18:09:44 -04001041 // default to V0 to allow for backward compatibility when SASL is enabled
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001042 // but not the handshake
1043 if b.conf.Net.SASL.Handshake {
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001044 handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
1045 if handshakeErr != nil {
1046 Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
1047 return handshakeErr
1048 }
1049 }
1050
1051 if b.conf.Net.SASL.Version == SASLHandshakeV1 {
1052 return b.sendAndReceiveV1SASLPlainAuth()
1053 }
1054 return b.sendAndReceiveV0SASLPlainAuth()
1055}
1056
1057// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
1058func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
khenaidoo7d3c5582021-08-11 18:09:44 -04001059 length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
1060 authBytes := make([]byte, length+4) // 4 byte length header + auth data
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001061 binary.BigEndian.PutUint32(authBytes, uint32(length))
khenaidoo7d3c5582021-08-11 18:09:44 -04001062 copy(authBytes[4:], b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001063
1064 requestTime := time.Now()
khenaidoo7d3c5582021-08-11 18:09:44 -04001065 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1066 b.addRequestInFlightMetrics(1)
1067 bytesWritten, err := b.write(authBytes)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001068 b.updateOutgoingCommunicationMetrics(bytesWritten)
1069 if err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -04001070 b.addRequestInFlightMetrics(-1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001071 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1072 return err
1073 }
1074
1075 header := make([]byte, 4)
khenaidoo7d3c5582021-08-11 18:09:44 -04001076 n, err := b.readFull(header)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001077 b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
1078 // If the credentials are valid, we would get a 4 byte response filled with null characters.
1079 // Otherwise, the broker closes the connection and we get an EOF
1080 if err != nil {
1081 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1082 return err
1083 }
1084
1085 Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
1086 return nil
1087}
1088
1089// sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
1090func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
1091 correlationID := b.correlationID
1092
1093 requestTime := time.Now()
1094
khenaidoo7d3c5582021-08-11 18:09:44 -04001095 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1096 b.addRequestInFlightMetrics(1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001097 bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001098 b.updateOutgoingCommunicationMetrics(bytesWritten)
1099
1100 if err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -04001101 b.addRequestInFlightMetrics(-1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001102 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1103 return err
1104 }
1105
1106 b.correlationID++
1107
1108 bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
1109 b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
1110
1111 // With v1 sasl we get an error message set in the response we can return
1112 if err != nil {
1113 Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
1114 return err
1115 }
1116
1117 return nil
1118}
1119
1120// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
1121// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
1122func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
1123 if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
1124 return err
1125 }
1126
1127 token, err := provider.Token()
1128 if err != nil {
1129 return err
1130 }
1131
1132 message, err := buildClientFirstMessage(token)
1133 if err != nil {
1134 return err
1135 }
1136
1137 challenged, err := b.sendClientMessage(message)
1138 if err != nil {
1139 return err
1140 }
1141
1142 if challenged {
1143 // Abort the token exchange. The broker returns the failure code.
1144 _, err = b.sendClientMessage([]byte(`\x01`))
1145 }
1146
1147 return err
1148}
1149
1150// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
1151// if the broker responds with a challenge, in which case the token is
1152// rejected.
1153func (b *Broker) sendClientMessage(message []byte) (bool, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001154 requestTime := time.Now()
khenaidoo7d3c5582021-08-11 18:09:44 -04001155 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1156 b.addRequestInFlightMetrics(1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001157 correlationID := b.correlationID
1158
1159 bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
khenaidoo7d3c5582021-08-11 18:09:44 -04001160 b.updateOutgoingCommunicationMetrics(bytesWritten)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001161 if err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -04001162 b.addRequestInFlightMetrics(-1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001163 return false, err
1164 }
1165
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001166 b.correlationID++
1167
1168 res := &SaslAuthenticateResponse{}
1169 bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
1170
1171 requestLatency := time.Since(requestTime)
1172 b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
1173
1174 isChallenge := len(res.SaslAuthBytes) > 0
1175
1176 if isChallenge && err != nil {
1177 Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
1178 }
1179
1180 return isChallenge, err
1181}
1182
1183func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
1184 if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
1185 return err
1186 }
1187
1188 scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
1189 if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
1190 return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
1191 }
1192
1193 msg, err := scramClient.Step("")
1194 if err != nil {
1195 return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001196 }
1197
1198 for !scramClient.Done() {
1199 requestTime := time.Now()
khenaidoo7d3c5582021-08-11 18:09:44 -04001200 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1201 b.addRequestInFlightMetrics(1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001202 correlationID := b.correlationID
1203 bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
khenaidoo7d3c5582021-08-11 18:09:44 -04001204 b.updateOutgoingCommunicationMetrics(bytesWritten)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001205 if err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -04001206 b.addRequestInFlightMetrics(-1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001207 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1208 return err
1209 }
1210
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001211 b.correlationID++
1212 challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
1213 if err != nil {
khenaidoo7d3c5582021-08-11 18:09:44 -04001214 b.addRequestInFlightMetrics(-1)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001215 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1216 return err
1217 }
1218
1219 b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
1220 msg, err = scramClient.Step(string(challenge))
1221 if err != nil {
1222 Logger.Println("SASL authentication failed", err)
1223 return err
1224 }
1225 }
1226
1227 Logger.Println("SASL authentication succeeded")
1228 return nil
1229}
1230
1231func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
1232 rb := &SaslAuthenticateRequest{msg}
1233 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1234 buf, err := encode(req, b.conf.MetricRegistry)
1235 if err != nil {
1236 return 0, err
1237 }
1238
khenaidoo7d3c5582021-08-11 18:09:44 -04001239 return b.write(buf)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001240}
1241
1242func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
1243 buf := make([]byte, responseLengthSize+correlationIDSize)
khenaidoo7d3c5582021-08-11 18:09:44 -04001244 _, err := b.readFull(buf)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001245 if err != nil {
1246 return nil, err
1247 }
1248
1249 header := responseHeader{}
khenaidoo7d3c5582021-08-11 18:09:44 -04001250 err = versionedDecode(buf, &header, 0)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001251 if err != nil {
1252 return nil, err
1253 }
1254
1255 if header.correlationID != correlationID {
1256 return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1257 }
1258
1259 buf = make([]byte, header.length-correlationIDSize)
khenaidoo7d3c5582021-08-11 18:09:44 -04001260 _, err = b.readFull(buf)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001261 if err != nil {
1262 return nil, err
1263 }
1264
1265 res := &SaslAuthenticateResponse{}
1266 if err := versionedDecode(buf, res, 0); err != nil {
1267 return nil, err
1268 }
1269 if res.Err != ErrNoError {
1270 return nil, res.Err
1271 }
1272 return res.SaslAuthBytes, nil
1273}
1274
1275// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
1276// https://tools.ietf.org/html/rfc7628
1277func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
1278 var ext string
1279
1280 if token.Extensions != nil && len(token.Extensions) > 0 {
1281 if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
1282 return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
1283 }
1284 ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
1285 }
1286
1287 resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
1288
1289 return resp, nil
1290}
1291
1292// mapToString returns a list of key-value pairs ordered by key.
1293// keyValSep separates the key from the value. elemSep separates each pair.
1294func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
1295 buf := make([]string, 0, len(extensions))
1296
1297 for k, v := range extensions {
1298 buf = append(buf, k+keyValSep+v)
1299 }
1300
1301 sort.Strings(buf)
1302
1303 return strings.Join(buf, elemSep)
1304}
1305
1306func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
khenaidoo7d3c5582021-08-11 18:09:44 -04001307 authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001308 rb := &SaslAuthenticateRequest{authBytes}
1309 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1310 buf, err := encode(req, b.conf.MetricRegistry)
1311 if err != nil {
1312 return 0, err
1313 }
1314
khenaidoo7d3c5582021-08-11 18:09:44 -04001315 return b.write(buf)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001316}
1317
1318func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001319 rb := &SaslAuthenticateRequest{initialResp}
1320
1321 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1322
1323 buf, err := encode(req, b.conf.MetricRegistry)
1324 if err != nil {
1325 return 0, err
1326 }
1327
khenaidoo7d3c5582021-08-11 18:09:44 -04001328 return b.write(buf)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001329}
1330
1331func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001332 buf := make([]byte, responseLengthSize+correlationIDSize)
khenaidoo7d3c5582021-08-11 18:09:44 -04001333 bytesRead, err := b.readFull(buf)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001334 if err != nil {
1335 return bytesRead, err
1336 }
1337
1338 header := responseHeader{}
khenaidoo7d3c5582021-08-11 18:09:44 -04001339 err = versionedDecode(buf, &header, 0)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001340 if err != nil {
1341 return bytesRead, err
1342 }
1343
1344 if header.correlationID != correlationID {
1345 return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1346 }
1347
1348 buf = make([]byte, header.length-correlationIDSize)
khenaidoo7d3c5582021-08-11 18:09:44 -04001349 c, err := b.readFull(buf)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001350 bytesRead += c
1351 if err != nil {
1352 return bytesRead, err
1353 }
1354
1355 if err := versionedDecode(buf, res, 0); err != nil {
1356 return bytesRead, err
1357 }
1358
1359 if res.Err != ErrNoError {
1360 return bytesRead, res.Err
1361 }
1362
1363 return bytesRead, nil
1364}
1365
1366func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
khenaidoo7d3c5582021-08-11 18:09:44 -04001367 b.updateRequestLatencyAndInFlightMetrics(requestLatency)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001368 b.responseRate.Mark(1)
1369
1370 if b.brokerResponseRate != nil {
1371 b.brokerResponseRate.Mark(1)
1372 }
1373
1374 responseSize := int64(bytes)
1375 b.incomingByteRate.Mark(responseSize)
1376 if b.brokerIncomingByteRate != nil {
1377 b.brokerIncomingByteRate.Mark(responseSize)
1378 }
1379
1380 b.responseSize.Update(responseSize)
1381 if b.brokerResponseSize != nil {
1382 b.brokerResponseSize.Update(responseSize)
1383 }
1384}
1385
khenaidoo7d3c5582021-08-11 18:09:44 -04001386func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001387 requestLatencyInMs := int64(requestLatency / time.Millisecond)
1388 b.requestLatency.Update(requestLatencyInMs)
1389
1390 if b.brokerRequestLatency != nil {
1391 b.brokerRequestLatency.Update(requestLatencyInMs)
1392 }
1393
khenaidoo7d3c5582021-08-11 18:09:44 -04001394 b.addRequestInFlightMetrics(-1)
1395}
1396
1397func (b *Broker) addRequestInFlightMetrics(i int64) {
1398 b.requestsInFlight.Inc(i)
1399 if b.brokerRequestsInFlight != nil {
1400 b.brokerRequestsInFlight.Inc(i)
1401 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001402}
1403
1404func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
1405 b.requestRate.Mark(1)
1406 if b.brokerRequestRate != nil {
1407 b.brokerRequestRate.Mark(1)
1408 }
1409
1410 requestSize := int64(bytes)
1411 b.outgoingByteRate.Mark(requestSize)
1412 if b.brokerOutgoingByteRate != nil {
1413 b.brokerOutgoingByteRate.Mark(requestSize)
1414 }
1415
1416 b.requestSize.Update(requestSize)
1417 if b.brokerRequestSize != nil {
1418 b.brokerRequestSize.Update(requestSize)
1419 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001420}
1421
1422func (b *Broker) registerMetrics() {
1423 b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
1424 b.brokerRequestRate = b.registerMeter("request-rate")
1425 b.brokerRequestSize = b.registerHistogram("request-size")
1426 b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
1427 b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
1428 b.brokerResponseRate = b.registerMeter("response-rate")
1429 b.brokerResponseSize = b.registerHistogram("response-size")
khenaidoo7d3c5582021-08-11 18:09:44 -04001430 b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001431}
1432
1433func (b *Broker) unregisterMetrics() {
1434 for _, name := range b.registeredMetrics {
1435 b.conf.MetricRegistry.Unregister(name)
1436 }
khenaidoo7d3c5582021-08-11 18:09:44 -04001437 b.registeredMetrics = nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001438}
1439
1440func (b *Broker) registerMeter(name string) metrics.Meter {
1441 nameForBroker := getMetricNameForBroker(name, b)
1442 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1443 return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
1444}
1445
1446func (b *Broker) registerHistogram(name string) metrics.Histogram {
1447 nameForBroker := getMetricNameForBroker(name, b)
1448 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1449 return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
1450}
khenaidoo7d3c5582021-08-11 18:09:44 -04001451
1452func (b *Broker) registerCounter(name string) metrics.Counter {
1453 nameForBroker := getMetricNameForBroker(name, b)
1454 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1455 return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
1456}
1457
1458func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config {
1459 if cfg == nil {
1460 cfg = &tls.Config{
1461 MinVersion: tls.VersionTLS12,
1462 }
1463 }
1464 if cfg.ServerName != "" {
1465 return cfg
1466 }
1467
1468 c := cfg.Clone()
1469 sn, _, err := net.SplitHostPort(addr)
1470 if err != nil {
1471 Logger.Println(fmt.Errorf("failed to get ServerName from addr %w", err))
1472 }
1473 c.ServerName = sn
1474 return c
1475}