blob: dd01e4ef1fb487a8eb6977ead1e49d3ec7be7d2a [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "crypto/tls"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
William Kurkiandaa6bb22019-03-07 12:26:28 -05009 "sort"
khenaidooac637102019-01-14 15:44:34 -050010 "strconv"
William Kurkiandaa6bb22019-03-07 12:26:28 -050011 "strings"
khenaidooac637102019-01-14 15:44:34 -050012 "sync"
13 "sync/atomic"
14 "time"
15
khenaidood948f772021-08-11 17:49:24 -040016 "github.com/rcrowley/go-metrics"
khenaidooac637102019-01-14 15:44:34 -050017)
18
19// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
20type Broker struct {
Scott Baker8461e152019-10-01 14:44:30 -070021 conf *Config
khenaidooac637102019-01-14 15:44:34 -050022 rack *string
23
Scott Baker8461e152019-10-01 14:44:30 -070024 id int32
25 addr string
khenaidooac637102019-01-14 15:44:34 -050026 correlationID int32
27 conn net.Conn
28 connErr error
29 lock sync.Mutex
30 opened int32
Scott Baker8461e152019-10-01 14:44:30 -070031 responses chan responsePromise
32 done chan bool
khenaidooac637102019-01-14 15:44:34 -050033
Scott Baker8461e152019-10-01 14:44:30 -070034 registeredMetrics []string
khenaidooac637102019-01-14 15:44:34 -050035
36 incomingByteRate metrics.Meter
37 requestRate metrics.Meter
38 requestSize metrics.Histogram
39 requestLatency metrics.Histogram
40 outgoingByteRate metrics.Meter
41 responseRate metrics.Meter
42 responseSize metrics.Histogram
khenaidood948f772021-08-11 17:49:24 -040043 requestsInFlight metrics.Counter
khenaidooac637102019-01-14 15:44:34 -050044 brokerIncomingByteRate metrics.Meter
45 brokerRequestRate metrics.Meter
46 brokerRequestSize metrics.Histogram
47 brokerRequestLatency metrics.Histogram
48 brokerOutgoingByteRate metrics.Meter
49 brokerResponseRate metrics.Meter
50 brokerResponseSize metrics.Histogram
khenaidood948f772021-08-11 17:49:24 -040051 brokerRequestsInFlight metrics.Counter
Scott Baker8461e152019-10-01 14:44:30 -070052
53 kerberosAuthenticator GSSAPIKerberosAuth
khenaidooac637102019-01-14 15:44:34 -050054}
55
William Kurkiandaa6bb22019-03-07 12:26:28 -050056// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
57type SASLMechanism string
58
59const (
60 // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
61 SASLTypeOAuth = "OAUTHBEARER"
62 // SASLTypePlaintext represents the SASL/PLAIN mechanism
63 SASLTypePlaintext = "PLAIN"
Scott Baker8461e152019-10-01 14:44:30 -070064 // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
65 SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
66 // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
67 SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
68 SASLTypeGSSAPI = "GSSAPI"
William Kurkiandaa6bb22019-03-07 12:26:28 -050069 // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
70 // server negotiate SASL auth using opaque packets.
71 SASLHandshakeV0 = int16(0)
72 // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
73 // server negotiate SASL by wrapping tokens with Kafka protocol headers.
74 SASLHandshakeV1 = int16(1)
75 // SASLExtKeyAuth is the reserved extension key name sent as part of the
khenaidood948f772021-08-11 17:49:24 -040076 // SASL/OAUTHBEARER initial client response
William Kurkiandaa6bb22019-03-07 12:26:28 -050077 SASLExtKeyAuth = "auth"
78)
79
80// AccessToken contains an access token used to authenticate a
81// SASL/OAUTHBEARER client along with associated metadata.
82type AccessToken struct {
83 // Token is the access token payload.
84 Token string
85 // Extensions is a optional map of arbitrary key-value pairs that can be
86 // sent with the SASL/OAUTHBEARER initial client response. These values are
87 // ignored by the SASL server if they are unexpected. This feature is only
88 // supported by Kafka >= 2.1.0.
89 Extensions map[string]string
90}
91
92// AccessTokenProvider is the interface that encapsulates how implementors
93// can generate access tokens for Kafka broker authentication.
94type AccessTokenProvider interface {
95 // Token returns an access token. The implementation should ensure token
96 // reuse so that multiple calls at connect time do not create multiple
97 // tokens. The implementation should also periodically refresh the token in
98 // order to guarantee that each call returns an unexpired token. This
99 // method should not block indefinitely--a timeout error should be returned
100 // after a short period of inactivity so that the broker connection logic
101 // can log debugging information and retry.
102 Token() (*AccessToken, error)
103}
104
Scott Baker8461e152019-10-01 14:44:30 -0700105// SCRAMClient is a an interface to a SCRAM
106// client implementation.
107type SCRAMClient interface {
108 // Begin prepares the client for the SCRAM exchange
109 // with the server with a user name and a password
110 Begin(userName, password, authzID string) error
111 // Step steps client through the SCRAM exchange. It is
112 // called repeatedly until it errors or `Done` returns true.
113 Step(challenge string) (response string, err error)
114 // Done should return true when the SCRAM conversation
115 // is over.
116 Done() bool
117}
118
khenaidooac637102019-01-14 15:44:34 -0500119type responsePromise struct {
120 requestTime time.Time
121 correlationID int32
khenaidood948f772021-08-11 17:49:24 -0400122 headerVersion int16
khenaidooac637102019-01-14 15:44:34 -0500123 packets chan []byte
124 errors chan error
125}
126
127// NewBroker creates and returns a Broker targeting the given host:port address.
128// This does not attempt to actually connect, you have to call Open() for that.
129func NewBroker(addr string) *Broker {
130 return &Broker{id: -1, addr: addr}
131}
132
133// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
134// waiting for the connection to complete. This means that any subsequent operations on the broker will
135// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
136// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
137// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
138func (b *Broker) Open(conf *Config) error {
139 if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
140 return ErrAlreadyConnected
141 }
142
143 if conf == nil {
144 conf = NewConfig()
145 }
146
147 err := conf.Validate()
148 if err != nil {
149 return err
150 }
151
152 b.lock.Lock()
153
154 go withRecover(func() {
155 defer b.lock.Unlock()
156
khenaidood948f772021-08-11 17:49:24 -0400157 dialer := conf.getDialer()
158 b.conn, b.connErr = dialer.Dial("tcp", b.addr)
khenaidooac637102019-01-14 15:44:34 -0500159 if b.connErr != nil {
160 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
161 b.conn = nil
162 atomic.StoreInt32(&b.opened, 0)
163 return
164 }
khenaidood948f772021-08-11 17:49:24 -0400165 if conf.Net.TLS.Enable {
166 b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
167 }
khenaidooac637102019-01-14 15:44:34 -0500168
khenaidood948f772021-08-11 17:49:24 -0400169 b.conn = newBufConn(b.conn)
khenaidooac637102019-01-14 15:44:34 -0500170 b.conf = conf
171
172 // Create or reuse the global metrics shared between brokers
173 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
174 b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
175 b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
176 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
177 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
178 b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
179 b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
khenaidood948f772021-08-11 17:49:24 -0400180 b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
khenaidooac637102019-01-14 15:44:34 -0500181 // Do not gather metrics for seeded broker (only used during bootstrap) because they share
182 // the same id (-1) and are already exposed through the global metrics above
183 if b.id >= 0 {
Scott Baker8461e152019-10-01 14:44:30 -0700184 b.registerMetrics()
khenaidooac637102019-01-14 15:44:34 -0500185 }
186
187 if conf.Net.SASL.Enable {
William Kurkiandaa6bb22019-03-07 12:26:28 -0500188 b.connErr = b.authenticateViaSASL()
189
khenaidooac637102019-01-14 15:44:34 -0500190 if b.connErr != nil {
191 err = b.conn.Close()
192 if err == nil {
193 Logger.Printf("Closed connection to broker %s\n", b.addr)
194 } else {
195 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
196 }
197 b.conn = nil
198 atomic.StoreInt32(&b.opened, 0)
199 return
200 }
201 }
202
203 b.done = make(chan bool)
204 b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
205
206 if b.id >= 0 {
207 Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
208 } else {
209 Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
210 }
211 go withRecover(b.responseReceiver)
212 })
213
214 return nil
215}
216
217// Connected returns true if the broker is connected and false otherwise. If the broker is not
218// connected but it had tried to connect, the error from that connection attempt is also returned.
219func (b *Broker) Connected() (bool, error) {
220 b.lock.Lock()
221 defer b.lock.Unlock()
222
223 return b.conn != nil, b.connErr
224}
225
khenaidood948f772021-08-11 17:49:24 -0400226// Close closes the broker resources
khenaidooac637102019-01-14 15:44:34 -0500227func (b *Broker) Close() error {
228 b.lock.Lock()
229 defer b.lock.Unlock()
230
231 if b.conn == nil {
232 return ErrNotConnected
233 }
234
235 close(b.responses)
236 <-b.done
237
238 err := b.conn.Close()
239
240 b.conn = nil
241 b.connErr = nil
242 b.done = nil
243 b.responses = nil
244
Scott Baker8461e152019-10-01 14:44:30 -0700245 b.unregisterMetrics()
khenaidooac637102019-01-14 15:44:34 -0500246
247 if err == nil {
248 Logger.Printf("Closed connection to broker %s\n", b.addr)
249 } else {
250 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
251 }
252
253 atomic.StoreInt32(&b.opened, 0)
254
255 return err
256}
257
258// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
259func (b *Broker) ID() int32 {
260 return b.id
261}
262
263// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
264func (b *Broker) Addr() string {
265 return b.addr
266}
267
268// Rack returns the broker's rack as retrieved from Kafka's metadata or the
269// empty string if it is not known. The returned value corresponds to the
270// broker's broker.rack configuration setting. Requires protocol version to be
271// at least v0.10.0.0.
272func (b *Broker) Rack() string {
273 if b.rack == nil {
274 return ""
275 }
276 return *b.rack
277}
278
khenaidood948f772021-08-11 17:49:24 -0400279// GetMetadata send a metadata request and returns a metadata response or error
khenaidooac637102019-01-14 15:44:34 -0500280func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
281 response := new(MetadataResponse)
282
283 err := b.sendAndReceive(request, response)
khenaidooac637102019-01-14 15:44:34 -0500284 if err != nil {
285 return nil, err
286 }
287
288 return response, nil
289}
290
khenaidood948f772021-08-11 17:49:24 -0400291// GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
khenaidooac637102019-01-14 15:44:34 -0500292func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
293 response := new(ConsumerMetadataResponse)
294
295 err := b.sendAndReceive(request, response)
khenaidooac637102019-01-14 15:44:34 -0500296 if err != nil {
297 return nil, err
298 }
299
300 return response, nil
301}
302
khenaidood948f772021-08-11 17:49:24 -0400303// FindCoordinator sends a find coordinate request and returns a response or error
khenaidooac637102019-01-14 15:44:34 -0500304func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
305 response := new(FindCoordinatorResponse)
306
307 err := b.sendAndReceive(request, response)
khenaidooac637102019-01-14 15:44:34 -0500308 if err != nil {
309 return nil, err
310 }
311
312 return response, nil
313}
314
khenaidood948f772021-08-11 17:49:24 -0400315// GetAvailableOffsets return an offset response or error
khenaidooac637102019-01-14 15:44:34 -0500316func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
317 response := new(OffsetResponse)
318
319 err := b.sendAndReceive(request, response)
khenaidooac637102019-01-14 15:44:34 -0500320 if err != nil {
321 return nil, err
322 }
323
324 return response, nil
325}
326
khenaidood948f772021-08-11 17:49:24 -0400327// Produce returns a produce response or error
khenaidooac637102019-01-14 15:44:34 -0500328func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
Scott Baker8461e152019-10-01 14:44:30 -0700329 var (
330 response *ProduceResponse
331 err error
332 )
khenaidooac637102019-01-14 15:44:34 -0500333
334 if request.RequiredAcks == NoResponse {
335 err = b.sendAndReceive(request, nil)
336 } else {
337 response = new(ProduceResponse)
338 err = b.sendAndReceive(request, response)
339 }
340
341 if err != nil {
342 return nil, err
343 }
344
345 return response, nil
346}
347
khenaidood948f772021-08-11 17:49:24 -0400348// Fetch returns a FetchResponse or error
khenaidooac637102019-01-14 15:44:34 -0500349func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
350 response := new(FetchResponse)
351
352 err := b.sendAndReceive(request, response)
khenaidooac637102019-01-14 15:44:34 -0500353 if err != nil {
354 return nil, err
355 }
356
357 return response, nil
358}
359
khenaidood948f772021-08-11 17:49:24 -0400360// CommitOffset return an Offset commit response or error
khenaidooac637102019-01-14 15:44:34 -0500361func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
362 response := new(OffsetCommitResponse)
363
364 err := b.sendAndReceive(request, response)
khenaidooac637102019-01-14 15:44:34 -0500365 if err != nil {
366 return nil, err
367 }
368
369 return response, nil
370}
371
khenaidood948f772021-08-11 17:49:24 -0400372// FetchOffset returns an offset fetch response or error
khenaidooac637102019-01-14 15:44:34 -0500373func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
374 response := new(OffsetFetchResponse)
khenaidood948f772021-08-11 17:49:24 -0400375 response.Version = request.Version // needed to handle the two header versions
khenaidooac637102019-01-14 15:44:34 -0500376
377 err := b.sendAndReceive(request, response)
khenaidooac637102019-01-14 15:44:34 -0500378 if err != nil {
379 return nil, err
380 }
381
382 return response, nil
383}
384
khenaidood948f772021-08-11 17:49:24 -0400385// JoinGroup returns a join group response or error
khenaidooac637102019-01-14 15:44:34 -0500386func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
387 response := new(JoinGroupResponse)
388
389 err := b.sendAndReceive(request, response)
390 if err != nil {
391 return nil, err
392 }
393
394 return response, nil
395}
396
khenaidood948f772021-08-11 17:49:24 -0400397// SyncGroup returns a sync group response or error
khenaidooac637102019-01-14 15:44:34 -0500398func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
399 response := new(SyncGroupResponse)
400
401 err := b.sendAndReceive(request, response)
402 if err != nil {
403 return nil, err
404 }
405
406 return response, nil
407}
408
khenaidood948f772021-08-11 17:49:24 -0400409// LeaveGroup return a leave group response or error
khenaidooac637102019-01-14 15:44:34 -0500410func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
411 response := new(LeaveGroupResponse)
412
413 err := b.sendAndReceive(request, response)
414 if err != nil {
415 return nil, err
416 }
417
418 return response, nil
419}
420
khenaidood948f772021-08-11 17:49:24 -0400421// Heartbeat returns a heartbeat response or error
khenaidooac637102019-01-14 15:44:34 -0500422func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
423 response := new(HeartbeatResponse)
424
425 err := b.sendAndReceive(request, response)
426 if err != nil {
427 return nil, err
428 }
429
430 return response, nil
431}
432
khenaidood948f772021-08-11 17:49:24 -0400433// ListGroups return a list group response or error
khenaidooac637102019-01-14 15:44:34 -0500434func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
435 response := new(ListGroupsResponse)
436
437 err := b.sendAndReceive(request, response)
438 if err != nil {
439 return nil, err
440 }
441
442 return response, nil
443}
444
khenaidood948f772021-08-11 17:49:24 -0400445// DescribeGroups return describe group response or error
khenaidooac637102019-01-14 15:44:34 -0500446func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
447 response := new(DescribeGroupsResponse)
448
449 err := b.sendAndReceive(request, response)
450 if err != nil {
451 return nil, err
452 }
453
454 return response, nil
455}
456
khenaidood948f772021-08-11 17:49:24 -0400457// ApiVersions return api version response or error
khenaidooac637102019-01-14 15:44:34 -0500458func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
459 response := new(ApiVersionsResponse)
460
461 err := b.sendAndReceive(request, response)
462 if err != nil {
463 return nil, err
464 }
465
466 return response, nil
467}
468
khenaidood948f772021-08-11 17:49:24 -0400469// CreateTopics send a create topic request and returns create topic response
khenaidooac637102019-01-14 15:44:34 -0500470func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
471 response := new(CreateTopicsResponse)
472
473 err := b.sendAndReceive(request, response)
474 if err != nil {
475 return nil, err
476 }
477
478 return response, nil
479}
480
khenaidood948f772021-08-11 17:49:24 -0400481// DeleteTopics sends a delete topic request and returns delete topic response
khenaidooac637102019-01-14 15:44:34 -0500482func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
483 response := new(DeleteTopicsResponse)
484
485 err := b.sendAndReceive(request, response)
486 if err != nil {
487 return nil, err
488 }
489
490 return response, nil
491}
492
khenaidood948f772021-08-11 17:49:24 -0400493// CreatePartitions sends a create partition request and returns create
494// partitions response or error
khenaidooac637102019-01-14 15:44:34 -0500495func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
496 response := new(CreatePartitionsResponse)
497
498 err := b.sendAndReceive(request, response)
499 if err != nil {
500 return nil, err
501 }
502
503 return response, nil
504}
505
khenaidood948f772021-08-11 17:49:24 -0400506// AlterPartitionReassignments sends a alter partition reassignments request and
507// returns alter partition reassignments response
508func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) {
509 response := new(AlterPartitionReassignmentsResponse)
510
511 err := b.sendAndReceive(request, response)
512 if err != nil {
513 return nil, err
514 }
515
516 return response, nil
517}
518
519// ListPartitionReassignments sends a list partition reassignments request and
520// returns list partition reassignments response
521func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
522 response := new(ListPartitionReassignmentsResponse)
523
524 err := b.sendAndReceive(request, response)
525 if err != nil {
526 return nil, err
527 }
528
529 return response, nil
530}
531
532// DeleteRecords send a request to delete records and return delete record
533// response or error
khenaidooac637102019-01-14 15:44:34 -0500534func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
535 response := new(DeleteRecordsResponse)
536
537 err := b.sendAndReceive(request, response)
538 if err != nil {
539 return nil, err
540 }
541
542 return response, nil
543}
544
khenaidood948f772021-08-11 17:49:24 -0400545// DescribeAcls sends a describe acl request and returns a response or error
khenaidooac637102019-01-14 15:44:34 -0500546func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
547 response := new(DescribeAclsResponse)
548
549 err := b.sendAndReceive(request, response)
550 if err != nil {
551 return nil, err
552 }
553
554 return response, nil
555}
556
khenaidood948f772021-08-11 17:49:24 -0400557// CreateAcls sends a create acl request and returns a response or error
khenaidooac637102019-01-14 15:44:34 -0500558func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
559 response := new(CreateAclsResponse)
560
561 err := b.sendAndReceive(request, response)
562 if err != nil {
563 return nil, err
564 }
565
566 return response, nil
567}
568
khenaidood948f772021-08-11 17:49:24 -0400569// DeleteAcls sends a delete acl request and returns a response or error
khenaidooac637102019-01-14 15:44:34 -0500570func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
571 response := new(DeleteAclsResponse)
572
573 err := b.sendAndReceive(request, response)
574 if err != nil {
575 return nil, err
576 }
577
578 return response, nil
579}
580
khenaidood948f772021-08-11 17:49:24 -0400581// InitProducerID sends an init producer request and returns a response or error
khenaidooac637102019-01-14 15:44:34 -0500582func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
583 response := new(InitProducerIDResponse)
584
585 err := b.sendAndReceive(request, response)
586 if err != nil {
587 return nil, err
588 }
589
590 return response, nil
591}
592
khenaidood948f772021-08-11 17:49:24 -0400593// AddPartitionsToTxn send a request to add partition to txn and returns
594// a response or error
khenaidooac637102019-01-14 15:44:34 -0500595func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
596 response := new(AddPartitionsToTxnResponse)
597
598 err := b.sendAndReceive(request, response)
599 if err != nil {
600 return nil, err
601 }
602
603 return response, nil
604}
605
khenaidood948f772021-08-11 17:49:24 -0400606// AddOffsetsToTxn sends a request to add offsets to txn and returns a response
607// or error
khenaidooac637102019-01-14 15:44:34 -0500608func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
609 response := new(AddOffsetsToTxnResponse)
610
611 err := b.sendAndReceive(request, response)
612 if err != nil {
613 return nil, err
614 }
615
616 return response, nil
617}
618
khenaidood948f772021-08-11 17:49:24 -0400619// EndTxn sends a request to end txn and returns a response or error
khenaidooac637102019-01-14 15:44:34 -0500620func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
621 response := new(EndTxnResponse)
622
623 err := b.sendAndReceive(request, response)
624 if err != nil {
625 return nil, err
626 }
627
628 return response, nil
629}
630
khenaidood948f772021-08-11 17:49:24 -0400631// TxnOffsetCommit sends a request to commit transaction offsets and returns
632// a response or error
khenaidooac637102019-01-14 15:44:34 -0500633func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
634 response := new(TxnOffsetCommitResponse)
635
636 err := b.sendAndReceive(request, response)
637 if err != nil {
638 return nil, err
639 }
640
641 return response, nil
642}
643
khenaidood948f772021-08-11 17:49:24 -0400644// DescribeConfigs sends a request to describe config and returns a response or
645// error
khenaidooac637102019-01-14 15:44:34 -0500646func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
647 response := new(DescribeConfigsResponse)
648
649 err := b.sendAndReceive(request, response)
650 if err != nil {
651 return nil, err
652 }
653
654 return response, nil
655}
656
khenaidood948f772021-08-11 17:49:24 -0400657// AlterConfigs sends a request to alter config and return a response or error
khenaidooac637102019-01-14 15:44:34 -0500658func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
659 response := new(AlterConfigsResponse)
660
661 err := b.sendAndReceive(request, response)
662 if err != nil {
663 return nil, err
664 }
665
666 return response, nil
667}
668
khenaidood948f772021-08-11 17:49:24 -0400669// IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
670func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
671 response := new(IncrementalAlterConfigsResponse)
672
673 err := b.sendAndReceive(request, response)
674 if err != nil {
675 return nil, err
676 }
677
678 return response, nil
679}
680
681// DeleteGroups sends a request to delete groups and returns a response or error
khenaidooac637102019-01-14 15:44:34 -0500682func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
683 response := new(DeleteGroupsResponse)
684
685 if err := b.sendAndReceive(request, response); err != nil {
686 return nil, err
687 }
688
689 return response, nil
690}
691
khenaidood948f772021-08-11 17:49:24 -0400692// DescribeLogDirs sends a request to get the broker's log dir paths and sizes
693func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
694 response := new(DescribeLogDirsResponse)
695
696 err := b.sendAndReceive(request, response)
697 if err != nil {
698 return nil, err
699 }
700
701 return response, nil
702}
703
704// DescribeUserScramCredentials sends a request to get SCRAM users
705func (b *Broker) DescribeUserScramCredentials(req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
706 res := new(DescribeUserScramCredentialsResponse)
707
708 err := b.sendAndReceive(req, res)
709 if err != nil {
710 return nil, err
711 }
712
713 return res, err
714}
715
716func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
717 res := new(AlterUserScramCredentialsResponse)
718
719 err := b.sendAndReceive(req, res)
720 if err != nil {
721 return nil, err
722 }
723
724 return res, nil
725}
726
727// readFull ensures the conn ReadDeadline has been setup before making a
728// call to io.ReadFull
729func (b *Broker) readFull(buf []byte) (n int, err error) {
730 if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
731 return 0, err
732 }
733
734 return io.ReadFull(b.conn, buf)
735}
736
737// write ensures the conn WriteDeadline has been setup before making a
738// call to conn.Write
739func (b *Broker) write(buf []byte) (n int, err error) {
740 if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
741 return 0, err
742 }
743
744 return b.conn.Write(buf)
745}
746
747func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
khenaidooac637102019-01-14 15:44:34 -0500748 b.lock.Lock()
749 defer b.lock.Unlock()
750
751 if b.conn == nil {
752 if b.connErr != nil {
753 return nil, b.connErr
754 }
755 return nil, ErrNotConnected
756 }
757
758 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
759 return nil, ErrUnsupportedVersion
760 }
761
762 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
763 buf, err := encode(req, b.conf.MetricRegistry)
764 if err != nil {
765 return nil, err
766 }
767
khenaidooac637102019-01-14 15:44:34 -0500768 requestTime := time.Now()
khenaidood948f772021-08-11 17:49:24 -0400769 // Will be decremented in responseReceiver (except error or request with NoResponse)
770 b.addRequestInFlightMetrics(1)
771 bytes, err := b.write(buf)
772 b.updateOutgoingCommunicationMetrics(bytes)
khenaidooac637102019-01-14 15:44:34 -0500773 if err != nil {
khenaidood948f772021-08-11 17:49:24 -0400774 b.addRequestInFlightMetrics(-1)
khenaidooac637102019-01-14 15:44:34 -0500775 return nil, err
776 }
777 b.correlationID++
778
779 if !promiseResponse {
780 // Record request latency without the response
khenaidood948f772021-08-11 17:49:24 -0400781 b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
khenaidooac637102019-01-14 15:44:34 -0500782 return nil, nil
783 }
784
khenaidood948f772021-08-11 17:49:24 -0400785 promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
khenaidooac637102019-01-14 15:44:34 -0500786 b.responses <- promise
787
788 return &promise, nil
789}
790
khenaidood948f772021-08-11 17:49:24 -0400791func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
792 responseHeaderVersion := int16(-1)
793 if res != nil {
794 responseHeaderVersion = res.headerVersion()
795 }
796
797 promise, err := b.send(req, res != nil, responseHeaderVersion)
khenaidooac637102019-01-14 15:44:34 -0500798 if err != nil {
799 return err
800 }
801
802 if promise == nil {
803 return nil
804 }
805
806 select {
807 case buf := <-promise.packets:
808 return versionedDecode(buf, res, req.version())
809 case err = <-promise.errors:
810 return err
811 }
812}
813
814func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
815 b.id, err = pd.getInt32()
816 if err != nil {
817 return err
818 }
819
820 host, err := pd.getString()
821 if err != nil {
822 return err
823 }
824
825 port, err := pd.getInt32()
826 if err != nil {
827 return err
828 }
829
830 if version >= 1 {
831 b.rack, err = pd.getNullableString()
832 if err != nil {
833 return err
834 }
835 }
836
837 b.addr = net.JoinHostPort(host, fmt.Sprint(port))
838 if _, _, err := net.SplitHostPort(b.addr); err != nil {
839 return err
840 }
841
842 return nil
843}
844
845func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
khenaidooac637102019-01-14 15:44:34 -0500846 host, portstr, err := net.SplitHostPort(b.addr)
847 if err != nil {
848 return err
849 }
Scott Baker8461e152019-10-01 14:44:30 -0700850
khenaidood948f772021-08-11 17:49:24 -0400851 port, err := strconv.ParseInt(portstr, 10, 32)
khenaidooac637102019-01-14 15:44:34 -0500852 if err != nil {
853 return err
854 }
855
856 pe.putInt32(b.id)
857
858 err = pe.putString(host)
859 if err != nil {
860 return err
861 }
862
863 pe.putInt32(int32(port))
864
865 if version >= 1 {
866 err = pe.putNullableString(b.rack)
867 if err != nil {
868 return err
869 }
870 }
871
872 return nil
873}
874
875func (b *Broker) responseReceiver() {
876 var dead error
Scott Baker8461e152019-10-01 14:44:30 -0700877
khenaidooac637102019-01-14 15:44:34 -0500878 for response := range b.responses {
879 if dead != nil {
khenaidood948f772021-08-11 17:49:24 -0400880 // This was previously incremented in send() and
881 // we are not calling updateIncomingCommunicationMetrics()
882 b.addRequestInFlightMetrics(-1)
khenaidooac637102019-01-14 15:44:34 -0500883 response.errors <- dead
884 continue
885 }
886
khenaidood948f772021-08-11 17:49:24 -0400887 headerLength := getHeaderLength(response.headerVersion)
888 header := make([]byte, headerLength)
khenaidooac637102019-01-14 15:44:34 -0500889
khenaidood948f772021-08-11 17:49:24 -0400890 bytesReadHeader, err := b.readFull(header)
khenaidooac637102019-01-14 15:44:34 -0500891 requestLatency := time.Since(response.requestTime)
892 if err != nil {
893 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
894 dead = err
895 response.errors <- err
896 continue
897 }
898
899 decodedHeader := responseHeader{}
khenaidood948f772021-08-11 17:49:24 -0400900 err = versionedDecode(header, &decodedHeader, response.headerVersion)
khenaidooac637102019-01-14 15:44:34 -0500901 if err != nil {
902 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
903 dead = err
904 response.errors <- err
905 continue
906 }
907 if decodedHeader.correlationID != response.correlationID {
908 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
909 // TODO if decoded ID < cur ID, discard until we catch up
910 // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
911 dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
912 response.errors <- dead
913 continue
914 }
915
khenaidood948f772021-08-11 17:49:24 -0400916 buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
917 bytesReadBody, err := b.readFull(buf)
khenaidooac637102019-01-14 15:44:34 -0500918 b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
919 if err != nil {
920 dead = err
921 response.errors <- err
922 continue
923 }
924
925 response.packets <- buf
926 }
927 close(b.done)
928}
929
khenaidood948f772021-08-11 17:49:24 -0400930func getHeaderLength(headerVersion int16) int8 {
931 if headerVersion < 1 {
932 return 8
933 } else {
934 // header contains additional tagged field length (0), we don't support actual tags yet.
935 return 9
936 }
937}
938
William Kurkiandaa6bb22019-03-07 12:26:28 -0500939func (b *Broker) authenticateViaSASL() error {
Scott Baker8461e152019-10-01 14:44:30 -0700940 switch b.conf.Net.SASL.Mechanism {
941 case SASLTypeOAuth:
William Kurkiandaa6bb22019-03-07 12:26:28 -0500942 return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
Scott Baker8461e152019-10-01 14:44:30 -0700943 case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
944 return b.sendAndReceiveSASLSCRAMv1()
945 case SASLTypeGSSAPI:
946 return b.sendAndReceiveKerberos()
947 default:
948 return b.sendAndReceiveSASLPlainAuth()
William Kurkiandaa6bb22019-03-07 12:26:28 -0500949 }
William Kurkiandaa6bb22019-03-07 12:26:28 -0500950}
951
Scott Baker8461e152019-10-01 14:44:30 -0700952func (b *Broker) sendAndReceiveKerberos() error {
953 b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
954 if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
955 b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
956 }
957 return b.kerberosAuthenticator.Authorize(b)
958}
959
960func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
961 rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
William Kurkiandaa6bb22019-03-07 12:26:28 -0500962
khenaidooac637102019-01-14 15:44:34 -0500963 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
964 buf, err := encode(req, b.conf.MetricRegistry)
965 if err != nil {
966 return err
967 }
968
khenaidooac637102019-01-14 15:44:34 -0500969 requestTime := time.Now()
khenaidood948f772021-08-11 17:49:24 -0400970 // Will be decremented in updateIncomingCommunicationMetrics (except error)
971 b.addRequestInFlightMetrics(1)
972 bytes, err := b.write(buf)
khenaidooac637102019-01-14 15:44:34 -0500973 b.updateOutgoingCommunicationMetrics(bytes)
974 if err != nil {
khenaidood948f772021-08-11 17:49:24 -0400975 b.addRequestInFlightMetrics(-1)
khenaidooac637102019-01-14 15:44:34 -0500976 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
977 return err
978 }
979 b.correlationID++
khenaidood948f772021-08-11 17:49:24 -0400980
khenaidooac637102019-01-14 15:44:34 -0500981 header := make([]byte, 8) // response header
khenaidood948f772021-08-11 17:49:24 -0400982 _, err = b.readFull(header)
khenaidooac637102019-01-14 15:44:34 -0500983 if err != nil {
khenaidood948f772021-08-11 17:49:24 -0400984 b.addRequestInFlightMetrics(-1)
khenaidooac637102019-01-14 15:44:34 -0500985 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
986 return err
987 }
Scott Baker8461e152019-10-01 14:44:30 -0700988
khenaidooac637102019-01-14 15:44:34 -0500989 length := binary.BigEndian.Uint32(header[:4])
990 payload := make([]byte, length-4)
khenaidood948f772021-08-11 17:49:24 -0400991 n, err := b.readFull(payload)
khenaidooac637102019-01-14 15:44:34 -0500992 if err != nil {
khenaidood948f772021-08-11 17:49:24 -0400993 b.addRequestInFlightMetrics(-1)
khenaidooac637102019-01-14 15:44:34 -0500994 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
995 return err
996 }
Scott Baker8461e152019-10-01 14:44:30 -0700997
khenaidooac637102019-01-14 15:44:34 -0500998 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
999 res := &SaslHandshakeResponse{}
Scott Baker8461e152019-10-01 14:44:30 -07001000
khenaidooac637102019-01-14 15:44:34 -05001001 err = versionedDecode(payload, res, 0)
1002 if err != nil {
1003 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
1004 return err
1005 }
Scott Baker8461e152019-10-01 14:44:30 -07001006
khenaidooac637102019-01-14 15:44:34 -05001007 if res.Err != ErrNoError {
1008 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
1009 return res.Err
1010 }
Scott Baker8461e152019-10-01 14:44:30 -07001011
1012 Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
khenaidooac637102019-01-14 15:44:34 -05001013 return nil
1014}
1015
Scott Baker8461e152019-10-01 14:44:30 -07001016// Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
1017// Kafka 1.x.x onward added a SaslAuthenticate request/response message which
1018// wraps the SASL flow in the Kafka protocol, which allows for returning
1019// meaningful errors on authentication failure.
khenaidooac637102019-01-14 15:44:34 -05001020//
1021// In SASL Plain, Kafka expects the auth header to be in the following format
1022// Message format (from https://tools.ietf.org/html/rfc4616):
1023//
1024// message = [authzid] UTF8NUL authcid UTF8NUL passwd
1025// authcid = 1*SAFE ; MUST accept up to 255 octets
1026// authzid = 1*SAFE ; MUST accept up to 255 octets
1027// passwd = 1*SAFE ; MUST accept up to 255 octets
1028// UTF8NUL = %x00 ; UTF-8 encoded NUL character
1029//
1030// SAFE = UTF1 / UTF2 / UTF3 / UTF4
1031// ;; any UTF-8 encoded Unicode character except NUL
1032//
Scott Baker8461e152019-10-01 14:44:30 -07001033// With SASL v0 handshake and auth then:
khenaidooac637102019-01-14 15:44:34 -05001034// When credentials are valid, Kafka returns a 4 byte array of null characters.
Scott Baker8461e152019-10-01 14:44:30 -07001035// When credentials are invalid, Kafka closes the connection.
1036//
1037// With SASL v1 handshake and auth then:
1038// When credentials are invalid, Kafka replies with a SaslAuthenticate response
1039// containing an error code and message detailing the authentication failure.
khenaidooac637102019-01-14 15:44:34 -05001040func (b *Broker) sendAndReceiveSASLPlainAuth() error {
khenaidood948f772021-08-11 17:49:24 -04001041 // default to V0 to allow for backward compatibility when SASL is enabled
Scott Baker8461e152019-10-01 14:44:30 -07001042 // but not the handshake
khenaidooac637102019-01-14 15:44:34 -05001043 if b.conf.Net.SASL.Handshake {
Scott Baker8461e152019-10-01 14:44:30 -07001044 handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
khenaidooac637102019-01-14 15:44:34 -05001045 if handshakeErr != nil {
1046 Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
1047 return handshakeErr
1048 }
1049 }
Scott Baker8461e152019-10-01 14:44:30 -07001050
1051 if b.conf.Net.SASL.Version == SASLHandshakeV1 {
1052 return b.sendAndReceiveV1SASLPlainAuth()
1053 }
1054 return b.sendAndReceiveV0SASLPlainAuth()
1055}
1056
1057// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
1058func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
khenaidood948f772021-08-11 17:49:24 -04001059 length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
1060 authBytes := make([]byte, length+4) // 4 byte length header + auth data
khenaidooac637102019-01-14 15:44:34 -05001061 binary.BigEndian.PutUint32(authBytes, uint32(length))
khenaidood948f772021-08-11 17:49:24 -04001062 copy(authBytes[4:], b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)
khenaidooac637102019-01-14 15:44:34 -05001063
1064 requestTime := time.Now()
khenaidood948f772021-08-11 17:49:24 -04001065 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1066 b.addRequestInFlightMetrics(1)
1067 bytesWritten, err := b.write(authBytes)
khenaidooac637102019-01-14 15:44:34 -05001068 b.updateOutgoingCommunicationMetrics(bytesWritten)
1069 if err != nil {
khenaidood948f772021-08-11 17:49:24 -04001070 b.addRequestInFlightMetrics(-1)
khenaidooac637102019-01-14 15:44:34 -05001071 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1072 return err
1073 }
1074
1075 header := make([]byte, 4)
khenaidood948f772021-08-11 17:49:24 -04001076 n, err := b.readFull(header)
khenaidooac637102019-01-14 15:44:34 -05001077 b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
1078 // If the credentials are valid, we would get a 4 byte response filled with null characters.
1079 // Otherwise, the broker closes the connection and we get an EOF
1080 if err != nil {
1081 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1082 return err
1083 }
1084
1085 Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
1086 return nil
1087}
1088
Scott Baker8461e152019-10-01 14:44:30 -07001089// sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
1090func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
1091 correlationID := b.correlationID
1092
1093 requestTime := time.Now()
1094
khenaidood948f772021-08-11 17:49:24 -04001095 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1096 b.addRequestInFlightMetrics(1)
Scott Baker8461e152019-10-01 14:44:30 -07001097 bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
Scott Baker8461e152019-10-01 14:44:30 -07001098 b.updateOutgoingCommunicationMetrics(bytesWritten)
1099
1100 if err != nil {
khenaidood948f772021-08-11 17:49:24 -04001101 b.addRequestInFlightMetrics(-1)
Scott Baker8461e152019-10-01 14:44:30 -07001102 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1103 return err
1104 }
1105
1106 b.correlationID++
1107
1108 bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
1109 b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
1110
1111 // With v1 sasl we get an error message set in the response we can return
1112 if err != nil {
1113 Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
1114 return err
1115 }
1116
1117 return nil
1118}
1119
William Kurkiandaa6bb22019-03-07 12:26:28 -05001120// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
1121// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
1122func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
William Kurkiandaa6bb22019-03-07 12:26:28 -05001123 if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
1124 return err
1125 }
1126
1127 token, err := provider.Token()
William Kurkiandaa6bb22019-03-07 12:26:28 -05001128 if err != nil {
1129 return err
1130 }
1131
Scott Baker8461e152019-10-01 14:44:30 -07001132 message, err := buildClientFirstMessage(token)
William Kurkiandaa6bb22019-03-07 12:26:28 -05001133 if err != nil {
1134 return err
1135 }
1136
Scott Baker8461e152019-10-01 14:44:30 -07001137 challenged, err := b.sendClientMessage(message)
1138 if err != nil {
1139 return err
1140 }
1141
1142 if challenged {
1143 // Abort the token exchange. The broker returns the failure code.
1144 _, err = b.sendClientMessage([]byte(`\x01`))
1145 }
1146
1147 return err
1148}
1149
1150// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
1151// if the broker responds with a challenge, in which case the token is
1152// rejected.
1153func (b *Broker) sendClientMessage(message []byte) (bool, error) {
Scott Baker8461e152019-10-01 14:44:30 -07001154 requestTime := time.Now()
khenaidood948f772021-08-11 17:49:24 -04001155 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1156 b.addRequestInFlightMetrics(1)
Scott Baker8461e152019-10-01 14:44:30 -07001157 correlationID := b.correlationID
1158
1159 bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
khenaidood948f772021-08-11 17:49:24 -04001160 b.updateOutgoingCommunicationMetrics(bytesWritten)
Scott Baker8461e152019-10-01 14:44:30 -07001161 if err != nil {
khenaidood948f772021-08-11 17:49:24 -04001162 b.addRequestInFlightMetrics(-1)
Scott Baker8461e152019-10-01 14:44:30 -07001163 return false, err
1164 }
1165
William Kurkiandaa6bb22019-03-07 12:26:28 -05001166 b.correlationID++
1167
Scott Baker8461e152019-10-01 14:44:30 -07001168 res := &SaslAuthenticateResponse{}
1169 bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
William Kurkiandaa6bb22019-03-07 12:26:28 -05001170
1171 requestLatency := time.Since(requestTime)
1172 b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
1173
Scott Baker8461e152019-10-01 14:44:30 -07001174 isChallenge := len(res.SaslAuthBytes) > 0
1175
1176 if isChallenge && err != nil {
1177 Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
1178 }
1179
1180 return isChallenge, err
1181}
1182
1183func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
1184 if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
1185 return err
1186 }
1187
1188 scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
1189 if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
1190 return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
1191 }
1192
1193 msg, err := scramClient.Step("")
1194 if err != nil {
1195 return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
Scott Baker8461e152019-10-01 14:44:30 -07001196 }
1197
1198 for !scramClient.Done() {
1199 requestTime := time.Now()
khenaidood948f772021-08-11 17:49:24 -04001200 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1201 b.addRequestInFlightMetrics(1)
Scott Baker8461e152019-10-01 14:44:30 -07001202 correlationID := b.correlationID
1203 bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
khenaidood948f772021-08-11 17:49:24 -04001204 b.updateOutgoingCommunicationMetrics(bytesWritten)
Scott Baker8461e152019-10-01 14:44:30 -07001205 if err != nil {
khenaidood948f772021-08-11 17:49:24 -04001206 b.addRequestInFlightMetrics(-1)
Scott Baker8461e152019-10-01 14:44:30 -07001207 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1208 return err
1209 }
1210
Scott Baker8461e152019-10-01 14:44:30 -07001211 b.correlationID++
1212 challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
1213 if err != nil {
khenaidood948f772021-08-11 17:49:24 -04001214 b.addRequestInFlightMetrics(-1)
Scott Baker8461e152019-10-01 14:44:30 -07001215 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1216 return err
1217 }
1218
1219 b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
1220 msg, err = scramClient.Step(string(challenge))
1221 if err != nil {
1222 Logger.Println("SASL authentication failed", err)
1223 return err
1224 }
1225 }
1226
1227 Logger.Println("SASL authentication succeeded")
William Kurkiandaa6bb22019-03-07 12:26:28 -05001228 return nil
1229}
1230
Scott Baker8461e152019-10-01 14:44:30 -07001231func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
1232 rb := &SaslAuthenticateRequest{msg}
1233 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1234 buf, err := encode(req, b.conf.MetricRegistry)
1235 if err != nil {
1236 return 0, err
1237 }
1238
khenaidood948f772021-08-11 17:49:24 -04001239 return b.write(buf)
Scott Baker8461e152019-10-01 14:44:30 -07001240}
1241
1242func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
1243 buf := make([]byte, responseLengthSize+correlationIDSize)
khenaidood948f772021-08-11 17:49:24 -04001244 _, err := b.readFull(buf)
Scott Baker8461e152019-10-01 14:44:30 -07001245 if err != nil {
1246 return nil, err
1247 }
1248
1249 header := responseHeader{}
khenaidood948f772021-08-11 17:49:24 -04001250 err = versionedDecode(buf, &header, 0)
Scott Baker8461e152019-10-01 14:44:30 -07001251 if err != nil {
1252 return nil, err
1253 }
1254
1255 if header.correlationID != correlationID {
1256 return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1257 }
1258
1259 buf = make([]byte, header.length-correlationIDSize)
khenaidood948f772021-08-11 17:49:24 -04001260 _, err = b.readFull(buf)
Scott Baker8461e152019-10-01 14:44:30 -07001261 if err != nil {
1262 return nil, err
1263 }
1264
1265 res := &SaslAuthenticateResponse{}
1266 if err := versionedDecode(buf, res, 0); err != nil {
1267 return nil, err
1268 }
1269 if res.Err != ErrNoError {
1270 return nil, res.Err
1271 }
1272 return res.SaslAuthBytes, nil
1273}
1274
William Kurkiandaa6bb22019-03-07 12:26:28 -05001275// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
1276// https://tools.ietf.org/html/rfc7628
Scott Baker8461e152019-10-01 14:44:30 -07001277func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
William Kurkiandaa6bb22019-03-07 12:26:28 -05001278 var ext string
1279
1280 if token.Extensions != nil && len(token.Extensions) > 0 {
1281 if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
Scott Baker8461e152019-10-01 14:44:30 -07001282 return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
William Kurkiandaa6bb22019-03-07 12:26:28 -05001283 }
1284 ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
1285 }
1286
1287 resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
1288
1289 return resp, nil
1290}
1291
1292// mapToString returns a list of key-value pairs ordered by key.
1293// keyValSep separates the key from the value. elemSep separates each pair.
1294func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
William Kurkiandaa6bb22019-03-07 12:26:28 -05001295 buf := make([]string, 0, len(extensions))
1296
1297 for k, v := range extensions {
1298 buf = append(buf, k+keyValSep+v)
1299 }
1300
1301 sort.Strings(buf)
1302
1303 return strings.Join(buf, elemSep)
1304}
1305
Scott Baker8461e152019-10-01 14:44:30 -07001306func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
khenaidood948f772021-08-11 17:49:24 -04001307 authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
Scott Baker8461e152019-10-01 14:44:30 -07001308 rb := &SaslAuthenticateRequest{authBytes}
1309 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1310 buf, err := encode(req, b.conf.MetricRegistry)
William Kurkiandaa6bb22019-03-07 12:26:28 -05001311 if err != nil {
1312 return 0, err
1313 }
1314
khenaidood948f772021-08-11 17:49:24 -04001315 return b.write(buf)
Scott Baker8461e152019-10-01 14:44:30 -07001316}
1317
1318func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
William Kurkiandaa6bb22019-03-07 12:26:28 -05001319 rb := &SaslAuthenticateRequest{initialResp}
1320
1321 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1322
1323 buf, err := encode(req, b.conf.MetricRegistry)
William Kurkiandaa6bb22019-03-07 12:26:28 -05001324 if err != nil {
1325 return 0, err
1326 }
1327
khenaidood948f772021-08-11 17:49:24 -04001328 return b.write(buf)
William Kurkiandaa6bb22019-03-07 12:26:28 -05001329}
1330
Scott Baker8461e152019-10-01 14:44:30 -07001331func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
Scott Baker8461e152019-10-01 14:44:30 -07001332 buf := make([]byte, responseLengthSize+correlationIDSize)
khenaidood948f772021-08-11 17:49:24 -04001333 bytesRead, err := b.readFull(buf)
William Kurkiandaa6bb22019-03-07 12:26:28 -05001334 if err != nil {
1335 return bytesRead, err
1336 }
1337
1338 header := responseHeader{}
khenaidood948f772021-08-11 17:49:24 -04001339 err = versionedDecode(buf, &header, 0)
William Kurkiandaa6bb22019-03-07 12:26:28 -05001340 if err != nil {
1341 return bytesRead, err
1342 }
1343
1344 if header.correlationID != correlationID {
1345 return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1346 }
1347
Scott Baker8461e152019-10-01 14:44:30 -07001348 buf = make([]byte, header.length-correlationIDSize)
khenaidood948f772021-08-11 17:49:24 -04001349 c, err := b.readFull(buf)
William Kurkiandaa6bb22019-03-07 12:26:28 -05001350 bytesRead += c
William Kurkiandaa6bb22019-03-07 12:26:28 -05001351 if err != nil {
1352 return bytesRead, err
1353 }
1354
William Kurkiandaa6bb22019-03-07 12:26:28 -05001355 if err := versionedDecode(buf, res, 0); err != nil {
1356 return bytesRead, err
1357 }
1358
William Kurkiandaa6bb22019-03-07 12:26:28 -05001359 if res.Err != ErrNoError {
1360 return bytesRead, res.Err
1361 }
1362
William Kurkiandaa6bb22019-03-07 12:26:28 -05001363 return bytesRead, nil
1364}
1365
khenaidooac637102019-01-14 15:44:34 -05001366func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
khenaidood948f772021-08-11 17:49:24 -04001367 b.updateRequestLatencyAndInFlightMetrics(requestLatency)
khenaidooac637102019-01-14 15:44:34 -05001368 b.responseRate.Mark(1)
Scott Baker8461e152019-10-01 14:44:30 -07001369
khenaidooac637102019-01-14 15:44:34 -05001370 if b.brokerResponseRate != nil {
1371 b.brokerResponseRate.Mark(1)
1372 }
Scott Baker8461e152019-10-01 14:44:30 -07001373
khenaidooac637102019-01-14 15:44:34 -05001374 responseSize := int64(bytes)
1375 b.incomingByteRate.Mark(responseSize)
1376 if b.brokerIncomingByteRate != nil {
1377 b.brokerIncomingByteRate.Mark(responseSize)
1378 }
Scott Baker8461e152019-10-01 14:44:30 -07001379
khenaidooac637102019-01-14 15:44:34 -05001380 b.responseSize.Update(responseSize)
1381 if b.brokerResponseSize != nil {
1382 b.brokerResponseSize.Update(responseSize)
1383 }
1384}
1385
khenaidood948f772021-08-11 17:49:24 -04001386func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
khenaidooac637102019-01-14 15:44:34 -05001387 requestLatencyInMs := int64(requestLatency / time.Millisecond)
1388 b.requestLatency.Update(requestLatencyInMs)
Scott Baker8461e152019-10-01 14:44:30 -07001389
khenaidooac637102019-01-14 15:44:34 -05001390 if b.brokerRequestLatency != nil {
1391 b.brokerRequestLatency.Update(requestLatencyInMs)
1392 }
Scott Baker8461e152019-10-01 14:44:30 -07001393
khenaidood948f772021-08-11 17:49:24 -04001394 b.addRequestInFlightMetrics(-1)
1395}
1396
1397func (b *Broker) addRequestInFlightMetrics(i int64) {
1398 b.requestsInFlight.Inc(i)
1399 if b.brokerRequestsInFlight != nil {
1400 b.brokerRequestsInFlight.Inc(i)
1401 }
khenaidooac637102019-01-14 15:44:34 -05001402}
1403
1404func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
1405 b.requestRate.Mark(1)
1406 if b.brokerRequestRate != nil {
1407 b.brokerRequestRate.Mark(1)
1408 }
Scott Baker8461e152019-10-01 14:44:30 -07001409
khenaidooac637102019-01-14 15:44:34 -05001410 requestSize := int64(bytes)
1411 b.outgoingByteRate.Mark(requestSize)
1412 if b.brokerOutgoingByteRate != nil {
1413 b.brokerOutgoingByteRate.Mark(requestSize)
1414 }
Scott Baker8461e152019-10-01 14:44:30 -07001415
khenaidooac637102019-01-14 15:44:34 -05001416 b.requestSize.Update(requestSize)
1417 if b.brokerRequestSize != nil {
1418 b.brokerRequestSize.Update(requestSize)
1419 }
Scott Baker8461e152019-10-01 14:44:30 -07001420}
1421
1422func (b *Broker) registerMetrics() {
1423 b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
1424 b.brokerRequestRate = b.registerMeter("request-rate")
1425 b.brokerRequestSize = b.registerHistogram("request-size")
1426 b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
1427 b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
1428 b.brokerResponseRate = b.registerMeter("response-rate")
1429 b.brokerResponseSize = b.registerHistogram("response-size")
khenaidood948f772021-08-11 17:49:24 -04001430 b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
Scott Baker8461e152019-10-01 14:44:30 -07001431}
1432
1433func (b *Broker) unregisterMetrics() {
1434 for _, name := range b.registeredMetrics {
1435 b.conf.MetricRegistry.Unregister(name)
1436 }
khenaidood948f772021-08-11 17:49:24 -04001437 b.registeredMetrics = nil
Scott Baker8461e152019-10-01 14:44:30 -07001438}
1439
1440func (b *Broker) registerMeter(name string) metrics.Meter {
1441 nameForBroker := getMetricNameForBroker(name, b)
1442 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1443 return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
1444}
1445
1446func (b *Broker) registerHistogram(name string) metrics.Histogram {
1447 nameForBroker := getMetricNameForBroker(name, b)
1448 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1449 return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
khenaidooac637102019-01-14 15:44:34 -05001450}
khenaidood948f772021-08-11 17:49:24 -04001451
1452func (b *Broker) registerCounter(name string) metrics.Counter {
1453 nameForBroker := getMetricNameForBroker(name, b)
1454 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1455 return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
1456}
1457
1458func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config {
1459 if cfg == nil {
1460 cfg = &tls.Config{
1461 MinVersion: tls.VersionTLS12,
1462 }
1463 }
1464 if cfg.ServerName != "" {
1465 return cfg
1466 }
1467
1468 c := cfg.Clone()
1469 sn, _, err := net.SplitHostPort(addr)
1470 if err != nil {
1471 Logger.Println(fmt.Errorf("failed to get ServerName from addr %w", err))
1472 }
1473 c.ServerName = sn
1474 return c
1475}