blob: 81467498c09ae7e155f56fa7ddc95c968fb84cc8 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "crypto/tls"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "sort"
10 "strconv"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15
16 metrics "github.com/rcrowley/go-metrics"
17)
18
19// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
20type Broker struct {
21 conf *Config
22 rack *string
23
24 id int32
25 addr string
26 correlationID int32
27 conn net.Conn
28 connErr error
29 lock sync.Mutex
30 opened int32
31 responses chan responsePromise
32 done chan bool
33
34 registeredMetrics []string
35
36 incomingByteRate metrics.Meter
37 requestRate metrics.Meter
38 requestSize metrics.Histogram
39 requestLatency metrics.Histogram
40 outgoingByteRate metrics.Meter
41 responseRate metrics.Meter
42 responseSize metrics.Histogram
43 brokerIncomingByteRate metrics.Meter
44 brokerRequestRate metrics.Meter
45 brokerRequestSize metrics.Histogram
46 brokerRequestLatency metrics.Histogram
47 brokerOutgoingByteRate metrics.Meter
48 brokerResponseRate metrics.Meter
49 brokerResponseSize metrics.Histogram
50
51 kerberosAuthenticator GSSAPIKerberosAuth
52}
53
54// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
55type SASLMechanism string
56
57const (
58 // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
59 SASLTypeOAuth = "OAUTHBEARER"
60 // SASLTypePlaintext represents the SASL/PLAIN mechanism
61 SASLTypePlaintext = "PLAIN"
62 // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
63 SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
64 // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
65 SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
66 SASLTypeGSSAPI = "GSSAPI"
67 // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
68 // server negotiate SASL auth using opaque packets.
69 SASLHandshakeV0 = int16(0)
70 // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
71 // server negotiate SASL by wrapping tokens with Kafka protocol headers.
72 SASLHandshakeV1 = int16(1)
73 // SASLExtKeyAuth is the reserved extension key name sent as part of the
74 // SASL/OAUTHBEARER intial client response
75 SASLExtKeyAuth = "auth"
76)
77
78// AccessToken contains an access token used to authenticate a
79// SASL/OAUTHBEARER client along with associated metadata.
80type AccessToken struct {
81 // Token is the access token payload.
82 Token string
83 // Extensions is a optional map of arbitrary key-value pairs that can be
84 // sent with the SASL/OAUTHBEARER initial client response. These values are
85 // ignored by the SASL server if they are unexpected. This feature is only
86 // supported by Kafka >= 2.1.0.
87 Extensions map[string]string
88}
89
90// AccessTokenProvider is the interface that encapsulates how implementors
91// can generate access tokens for Kafka broker authentication.
92type AccessTokenProvider interface {
93 // Token returns an access token. The implementation should ensure token
94 // reuse so that multiple calls at connect time do not create multiple
95 // tokens. The implementation should also periodically refresh the token in
96 // order to guarantee that each call returns an unexpired token. This
97 // method should not block indefinitely--a timeout error should be returned
98 // after a short period of inactivity so that the broker connection logic
99 // can log debugging information and retry.
100 Token() (*AccessToken, error)
101}
102
103// SCRAMClient is a an interface to a SCRAM
104// client implementation.
105type SCRAMClient interface {
106 // Begin prepares the client for the SCRAM exchange
107 // with the server with a user name and a password
108 Begin(userName, password, authzID string) error
109 // Step steps client through the SCRAM exchange. It is
110 // called repeatedly until it errors or `Done` returns true.
111 Step(challenge string) (response string, err error)
112 // Done should return true when the SCRAM conversation
113 // is over.
114 Done() bool
115}
116
117type responsePromise struct {
118 requestTime time.Time
119 correlationID int32
120 packets chan []byte
121 errors chan error
122}
123
124// NewBroker creates and returns a Broker targeting the given host:port address.
125// This does not attempt to actually connect, you have to call Open() for that.
126func NewBroker(addr string) *Broker {
127 return &Broker{id: -1, addr: addr}
128}
129
130// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
131// waiting for the connection to complete. This means that any subsequent operations on the broker will
132// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
133// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
134// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
135func (b *Broker) Open(conf *Config) error {
136 if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
137 return ErrAlreadyConnected
138 }
139
140 if conf == nil {
141 conf = NewConfig()
142 }
143
144 err := conf.Validate()
145 if err != nil {
146 return err
147 }
148
149 b.lock.Lock()
150
151 go withRecover(func() {
152 defer b.lock.Unlock()
153
154 dialer := net.Dialer{
155 Timeout: conf.Net.DialTimeout,
156 KeepAlive: conf.Net.KeepAlive,
157 LocalAddr: conf.Net.LocalAddr,
158 }
159
160 if conf.Net.TLS.Enable {
161 b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
162 } else if conf.Net.Proxy.Enable {
163 b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
164 } else {
165 b.conn, b.connErr = dialer.Dial("tcp", b.addr)
166 }
167 if b.connErr != nil {
168 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
169 b.conn = nil
170 atomic.StoreInt32(&b.opened, 0)
171 return
172 }
173 b.conn = newBufConn(b.conn)
174
175 b.conf = conf
176
177 // Create or reuse the global metrics shared between brokers
178 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
179 b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
180 b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
181 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
182 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
183 b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
184 b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
185 // Do not gather metrics for seeded broker (only used during bootstrap) because they share
186 // the same id (-1) and are already exposed through the global metrics above
187 if b.id >= 0 {
188 b.registerMetrics()
189 }
190
191 if conf.Net.SASL.Enable {
192
193 b.connErr = b.authenticateViaSASL()
194
195 if b.connErr != nil {
196 err = b.conn.Close()
197 if err == nil {
198 Logger.Printf("Closed connection to broker %s\n", b.addr)
199 } else {
200 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
201 }
202 b.conn = nil
203 atomic.StoreInt32(&b.opened, 0)
204 return
205 }
206 }
207
208 b.done = make(chan bool)
209 b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
210
211 if b.id >= 0 {
212 Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
213 } else {
214 Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
215 }
216 go withRecover(b.responseReceiver)
217 })
218
219 return nil
220}
221
222// Connected returns true if the broker is connected and false otherwise. If the broker is not
223// connected but it had tried to connect, the error from that connection attempt is also returned.
224func (b *Broker) Connected() (bool, error) {
225 b.lock.Lock()
226 defer b.lock.Unlock()
227
228 return b.conn != nil, b.connErr
229}
230
231//Close closes the broker resources
232func (b *Broker) Close() error {
233 b.lock.Lock()
234 defer b.lock.Unlock()
235
236 if b.conn == nil {
237 return ErrNotConnected
238 }
239
240 close(b.responses)
241 <-b.done
242
243 err := b.conn.Close()
244
245 b.conn = nil
246 b.connErr = nil
247 b.done = nil
248 b.responses = nil
249
250 b.unregisterMetrics()
251
252 if err == nil {
253 Logger.Printf("Closed connection to broker %s\n", b.addr)
254 } else {
255 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
256 }
257
258 atomic.StoreInt32(&b.opened, 0)
259
260 return err
261}
262
263// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
264func (b *Broker) ID() int32 {
265 return b.id
266}
267
268// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
269func (b *Broker) Addr() string {
270 return b.addr
271}
272
273// Rack returns the broker's rack as retrieved from Kafka's metadata or the
274// empty string if it is not known. The returned value corresponds to the
275// broker's broker.rack configuration setting. Requires protocol version to be
276// at least v0.10.0.0.
277func (b *Broker) Rack() string {
278 if b.rack == nil {
279 return ""
280 }
281 return *b.rack
282}
283
284//GetMetadata send a metadata request and returns a metadata response or error
285func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
286 response := new(MetadataResponse)
287
288 err := b.sendAndReceive(request, response)
289
290 if err != nil {
291 return nil, err
292 }
293
294 return response, nil
295}
296
297//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
298func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
299 response := new(ConsumerMetadataResponse)
300
301 err := b.sendAndReceive(request, response)
302
303 if err != nil {
304 return nil, err
305 }
306
307 return response, nil
308}
309
310//FindCoordinator sends a find coordinate request and returns a response or error
311func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
312 response := new(FindCoordinatorResponse)
313
314 err := b.sendAndReceive(request, response)
315
316 if err != nil {
317 return nil, err
318 }
319
320 return response, nil
321}
322
323//GetAvailableOffsets return an offset response or error
324func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
325 response := new(OffsetResponse)
326
327 err := b.sendAndReceive(request, response)
328
329 if err != nil {
330 return nil, err
331 }
332
333 return response, nil
334}
335
336//Produce returns a produce response or error
337func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
338 var (
339 response *ProduceResponse
340 err error
341 )
342
343 if request.RequiredAcks == NoResponse {
344 err = b.sendAndReceive(request, nil)
345 } else {
346 response = new(ProduceResponse)
347 err = b.sendAndReceive(request, response)
348 }
349
350 if err != nil {
351 return nil, err
352 }
353
354 return response, nil
355}
356
357//Fetch returns a FetchResponse or error
358func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
359 response := new(FetchResponse)
360
361 err := b.sendAndReceive(request, response)
362 if err != nil {
363 return nil, err
364 }
365
366 return response, nil
367}
368
369//CommitOffset return an Offset commit reponse or error
370func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
371 response := new(OffsetCommitResponse)
372
373 err := b.sendAndReceive(request, response)
374 if err != nil {
375 return nil, err
376 }
377
378 return response, nil
379}
380
381//FetchOffset returns an offset fetch response or error
382func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
383 response := new(OffsetFetchResponse)
384
385 err := b.sendAndReceive(request, response)
386 if err != nil {
387 return nil, err
388 }
389
390 return response, nil
391}
392
393//JoinGroup returns a join group response or error
394func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
395 response := new(JoinGroupResponse)
396
397 err := b.sendAndReceive(request, response)
398 if err != nil {
399 return nil, err
400 }
401
402 return response, nil
403}
404
405//SyncGroup returns a sync group response or error
406func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
407 response := new(SyncGroupResponse)
408
409 err := b.sendAndReceive(request, response)
410 if err != nil {
411 return nil, err
412 }
413
414 return response, nil
415}
416
417//LeaveGroup return a leave group response or error
418func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
419 response := new(LeaveGroupResponse)
420
421 err := b.sendAndReceive(request, response)
422 if err != nil {
423 return nil, err
424 }
425
426 return response, nil
427}
428
429//Heartbeat returns a heartbeat response or error
430func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
431 response := new(HeartbeatResponse)
432
433 err := b.sendAndReceive(request, response)
434 if err != nil {
435 return nil, err
436 }
437
438 return response, nil
439}
440
441//ListGroups return a list group response or error
442func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
443 response := new(ListGroupsResponse)
444
445 err := b.sendAndReceive(request, response)
446 if err != nil {
447 return nil, err
448 }
449
450 return response, nil
451}
452
453//DescribeGroups return describe group response or error
454func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
455 response := new(DescribeGroupsResponse)
456
457 err := b.sendAndReceive(request, response)
458 if err != nil {
459 return nil, err
460 }
461
462 return response, nil
463}
464
465//ApiVersions return api version response or error
466func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
467 response := new(ApiVersionsResponse)
468
469 err := b.sendAndReceive(request, response)
470 if err != nil {
471 return nil, err
472 }
473
474 return response, nil
475}
476
477//CreateTopics send a create topic request and returns create topic response
478func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
479 response := new(CreateTopicsResponse)
480
481 err := b.sendAndReceive(request, response)
482 if err != nil {
483 return nil, err
484 }
485
486 return response, nil
487}
488
489//DeleteTopics sends a delete topic request and returns delete topic response
490func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
491 response := new(DeleteTopicsResponse)
492
493 err := b.sendAndReceive(request, response)
494 if err != nil {
495 return nil, err
496 }
497
498 return response, nil
499}
500
501//CreatePartitions sends a create partition request and returns create
502//partitions response or error
503func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
504 response := new(CreatePartitionsResponse)
505
506 err := b.sendAndReceive(request, response)
507 if err != nil {
508 return nil, err
509 }
510
511 return response, nil
512}
513
514//DeleteRecords send a request to delete records and return delete record
515//response or error
516func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
517 response := new(DeleteRecordsResponse)
518
519 err := b.sendAndReceive(request, response)
520 if err != nil {
521 return nil, err
522 }
523
524 return response, nil
525}
526
527//DescribeAcls sends a describe acl request and returns a response or error
528func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
529 response := new(DescribeAclsResponse)
530
531 err := b.sendAndReceive(request, response)
532 if err != nil {
533 return nil, err
534 }
535
536 return response, nil
537}
538
539//CreateAcls sends a create acl request and returns a response or error
540func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
541 response := new(CreateAclsResponse)
542
543 err := b.sendAndReceive(request, response)
544 if err != nil {
545 return nil, err
546 }
547
548 return response, nil
549}
550
551//DeleteAcls sends a delete acl request and returns a response or error
552func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
553 response := new(DeleteAclsResponse)
554
555 err := b.sendAndReceive(request, response)
556 if err != nil {
557 return nil, err
558 }
559
560 return response, nil
561}
562
563//InitProducerID sends an init producer request and returns a response or error
564func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
565 response := new(InitProducerIDResponse)
566
567 err := b.sendAndReceive(request, response)
568 if err != nil {
569 return nil, err
570 }
571
572 return response, nil
573}
574
575//AddPartitionsToTxn send a request to add partition to txn and returns
576//a response or error
577func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
578 response := new(AddPartitionsToTxnResponse)
579
580 err := b.sendAndReceive(request, response)
581 if err != nil {
582 return nil, err
583 }
584
585 return response, nil
586}
587
588//AddOffsetsToTxn sends a request to add offsets to txn and returns a response
589//or error
590func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
591 response := new(AddOffsetsToTxnResponse)
592
593 err := b.sendAndReceive(request, response)
594 if err != nil {
595 return nil, err
596 }
597
598 return response, nil
599}
600
601//EndTxn sends a request to end txn and returns a response or error
602func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
603 response := new(EndTxnResponse)
604
605 err := b.sendAndReceive(request, response)
606 if err != nil {
607 return nil, err
608 }
609
610 return response, nil
611}
612
613//TxnOffsetCommit sends a request to commit transaction offsets and returns
614//a response or error
615func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
616 response := new(TxnOffsetCommitResponse)
617
618 err := b.sendAndReceive(request, response)
619 if err != nil {
620 return nil, err
621 }
622
623 return response, nil
624}
625
626//DescribeConfigs sends a request to describe config and returns a response or
627//error
628func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
629 response := new(DescribeConfigsResponse)
630
631 err := b.sendAndReceive(request, response)
632 if err != nil {
633 return nil, err
634 }
635
636 return response, nil
637}
638
639//AlterConfigs sends a request to alter config and return a response or error
640func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
641 response := new(AlterConfigsResponse)
642
643 err := b.sendAndReceive(request, response)
644 if err != nil {
645 return nil, err
646 }
647
648 return response, nil
649}
650
651//DeleteGroups sends a request to delete groups and returns a response or error
652func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
653 response := new(DeleteGroupsResponse)
654
655 if err := b.sendAndReceive(request, response); err != nil {
656 return nil, err
657 }
658
659 return response, nil
660}
661
662//DescribeLogDirs sends a request to get the broker's log dir paths and sizes
663func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
664 response := new(DescribeLogDirsResponse)
665
666 err := b.sendAndReceive(request, response)
667 if err != nil {
668 return nil, err
669 }
670
671 return response, nil
672}
673
674// readFull ensures the conn ReadDeadline has been setup before making a
675// call to io.ReadFull
676func (b *Broker) readFull(buf []byte) (n int, err error) {
677 if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
678 return 0, err
679 }
680
681 return io.ReadFull(b.conn, buf)
682}
683
684// write ensures the conn WriteDeadline has been setup before making a
685// call to conn.Write
686func (b *Broker) write(buf []byte) (n int, err error) {
687 if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
688 return 0, err
689 }
690
691 return b.conn.Write(buf)
692}
693
694func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
695 b.lock.Lock()
696 defer b.lock.Unlock()
697
698 if b.conn == nil {
699 if b.connErr != nil {
700 return nil, b.connErr
701 }
702 return nil, ErrNotConnected
703 }
704
705 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
706 return nil, ErrUnsupportedVersion
707 }
708
709 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
710 buf, err := encode(req, b.conf.MetricRegistry)
711 if err != nil {
712 return nil, err
713 }
714
715 requestTime := time.Now()
716 bytes, err := b.write(buf)
717 b.updateOutgoingCommunicationMetrics(bytes)
718 if err != nil {
719 return nil, err
720 }
721 b.correlationID++
722
723 if !promiseResponse {
724 // Record request latency without the response
725 b.updateRequestLatencyMetrics(time.Since(requestTime))
726 return nil, nil
727 }
728
729 promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
730 b.responses <- promise
731
732 return &promise, nil
733}
734
735func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
736 promise, err := b.send(req, res != nil)
737 if err != nil {
738 return err
739 }
740
741 if promise == nil {
742 return nil
743 }
744
745 select {
746 case buf := <-promise.packets:
747 return versionedDecode(buf, res, req.version())
748 case err = <-promise.errors:
749 return err
750 }
751}
752
753func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
754 b.id, err = pd.getInt32()
755 if err != nil {
756 return err
757 }
758
759 host, err := pd.getString()
760 if err != nil {
761 return err
762 }
763
764 port, err := pd.getInt32()
765 if err != nil {
766 return err
767 }
768
769 if version >= 1 {
770 b.rack, err = pd.getNullableString()
771 if err != nil {
772 return err
773 }
774 }
775
776 b.addr = net.JoinHostPort(host, fmt.Sprint(port))
777 if _, _, err := net.SplitHostPort(b.addr); err != nil {
778 return err
779 }
780
781 return nil
782}
783
784func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
785 host, portstr, err := net.SplitHostPort(b.addr)
786 if err != nil {
787 return err
788 }
789
790 port, err := strconv.Atoi(portstr)
791 if err != nil {
792 return err
793 }
794
795 pe.putInt32(b.id)
796
797 err = pe.putString(host)
798 if err != nil {
799 return err
800 }
801
802 pe.putInt32(int32(port))
803
804 if version >= 1 {
805 err = pe.putNullableString(b.rack)
806 if err != nil {
807 return err
808 }
809 }
810
811 return nil
812}
813
814func (b *Broker) responseReceiver() {
815 var dead error
816 header := make([]byte, 8)
817
818 for response := range b.responses {
819 if dead != nil {
820 response.errors <- dead
821 continue
822 }
823
824 bytesReadHeader, err := b.readFull(header)
825 requestLatency := time.Since(response.requestTime)
826 if err != nil {
827 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
828 dead = err
829 response.errors <- err
830 continue
831 }
832
833 decodedHeader := responseHeader{}
834 err = decode(header, &decodedHeader)
835 if err != nil {
836 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
837 dead = err
838 response.errors <- err
839 continue
840 }
841 if decodedHeader.correlationID != response.correlationID {
842 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
843 // TODO if decoded ID < cur ID, discard until we catch up
844 // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
845 dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
846 response.errors <- dead
847 continue
848 }
849
850 buf := make([]byte, decodedHeader.length-4)
851 bytesReadBody, err := b.readFull(buf)
852 b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
853 if err != nil {
854 dead = err
855 response.errors <- err
856 continue
857 }
858
859 response.packets <- buf
860 }
861 close(b.done)
862}
863
864func (b *Broker) authenticateViaSASL() error {
865 switch b.conf.Net.SASL.Mechanism {
866 case SASLTypeOAuth:
867 return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
868 case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
869 return b.sendAndReceiveSASLSCRAMv1()
870 case SASLTypeGSSAPI:
871 return b.sendAndReceiveKerberos()
872 default:
873 return b.sendAndReceiveSASLPlainAuth()
874 }
875}
876
877func (b *Broker) sendAndReceiveKerberos() error {
878 b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
879 if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
880 b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
881 }
882 return b.kerberosAuthenticator.Authorize(b)
883}
884
885func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
886 rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
887
888 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
889 buf, err := encode(req, b.conf.MetricRegistry)
890 if err != nil {
891 return err
892 }
893
894 requestTime := time.Now()
895 bytes, err := b.write(buf)
896 b.updateOutgoingCommunicationMetrics(bytes)
897 if err != nil {
898 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
899 return err
900 }
901 b.correlationID++
902
903 header := make([]byte, 8) // response header
904 _, err = b.readFull(header)
905 if err != nil {
906 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
907 return err
908 }
909
910 length := binary.BigEndian.Uint32(header[:4])
911 payload := make([]byte, length-4)
912 n, err := b.readFull(payload)
913 if err != nil {
914 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
915 return err
916 }
917
918 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
919 res := &SaslHandshakeResponse{}
920
921 err = versionedDecode(payload, res, 0)
922 if err != nil {
923 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
924 return err
925 }
926
927 if res.Err != ErrNoError {
928 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
929 return res.Err
930 }
931
932 Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
933 return nil
934}
935
936// Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
937// Kafka 1.x.x onward added a SaslAuthenticate request/response message which
938// wraps the SASL flow in the Kafka protocol, which allows for returning
939// meaningful errors on authentication failure.
940//
941// In SASL Plain, Kafka expects the auth header to be in the following format
942// Message format (from https://tools.ietf.org/html/rfc4616):
943//
944// message = [authzid] UTF8NUL authcid UTF8NUL passwd
945// authcid = 1*SAFE ; MUST accept up to 255 octets
946// authzid = 1*SAFE ; MUST accept up to 255 octets
947// passwd = 1*SAFE ; MUST accept up to 255 octets
948// UTF8NUL = %x00 ; UTF-8 encoded NUL character
949//
950// SAFE = UTF1 / UTF2 / UTF3 / UTF4
951// ;; any UTF-8 encoded Unicode character except NUL
952//
953// With SASL v0 handshake and auth then:
954// When credentials are valid, Kafka returns a 4 byte array of null characters.
955// When credentials are invalid, Kafka closes the connection.
956//
957// With SASL v1 handshake and auth then:
958// When credentials are invalid, Kafka replies with a SaslAuthenticate response
959// containing an error code and message detailing the authentication failure.
960func (b *Broker) sendAndReceiveSASLPlainAuth() error {
961 // default to V0 to allow for backward compatability when SASL is enabled
962 // but not the handshake
963 if b.conf.Net.SASL.Handshake {
964
965 handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
966 if handshakeErr != nil {
967 Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
968 return handshakeErr
969 }
970 }
971
972 if b.conf.Net.SASL.Version == SASLHandshakeV1 {
973 return b.sendAndReceiveV1SASLPlainAuth()
974 }
975 return b.sendAndReceiveV0SASLPlainAuth()
976}
977
978// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
979func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
980
981 length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
982 authBytes := make([]byte, length+4) //4 byte length header + auth data
983 binary.BigEndian.PutUint32(authBytes, uint32(length))
984 copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
985
986 requestTime := time.Now()
987 bytesWritten, err := b.write(authBytes)
988 b.updateOutgoingCommunicationMetrics(bytesWritten)
989 if err != nil {
990 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
991 return err
992 }
993
994 header := make([]byte, 4)
995 n, err := b.readFull(header)
996 b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
997 // If the credentials are valid, we would get a 4 byte response filled with null characters.
998 // Otherwise, the broker closes the connection and we get an EOF
999 if err != nil {
1000 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1001 return err
1002 }
1003
1004 Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
1005 return nil
1006}
1007
1008// sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
1009func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
1010 correlationID := b.correlationID
1011
1012 requestTime := time.Now()
1013
1014 bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
1015
1016 b.updateOutgoingCommunicationMetrics(bytesWritten)
1017
1018 if err != nil {
1019 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1020 return err
1021 }
1022
1023 b.correlationID++
1024
1025 bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
1026 b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
1027
1028 // With v1 sasl we get an error message set in the response we can return
1029 if err != nil {
1030 Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
1031 return err
1032 }
1033
1034 return nil
1035}
1036
1037// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
1038// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
1039func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
1040 if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
1041 return err
1042 }
1043
1044 token, err := provider.Token()
1045 if err != nil {
1046 return err
1047 }
1048
1049 message, err := buildClientFirstMessage(token)
1050 if err != nil {
1051 return err
1052 }
1053
1054 challenged, err := b.sendClientMessage(message)
1055 if err != nil {
1056 return err
1057 }
1058
1059 if challenged {
1060 // Abort the token exchange. The broker returns the failure code.
1061 _, err = b.sendClientMessage([]byte(`\x01`))
1062 }
1063
1064 return err
1065}
1066
1067// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
1068// if the broker responds with a challenge, in which case the token is
1069// rejected.
1070func (b *Broker) sendClientMessage(message []byte) (bool, error) {
1071
1072 requestTime := time.Now()
1073 correlationID := b.correlationID
1074
1075 bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
1076 if err != nil {
1077 return false, err
1078 }
1079
1080 b.updateOutgoingCommunicationMetrics(bytesWritten)
1081 b.correlationID++
1082
1083 res := &SaslAuthenticateResponse{}
1084 bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
1085
1086 requestLatency := time.Since(requestTime)
1087 b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
1088
1089 isChallenge := len(res.SaslAuthBytes) > 0
1090
1091 if isChallenge && err != nil {
1092 Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
1093 }
1094
1095 return isChallenge, err
1096}
1097
1098func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
1099 if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
1100 return err
1101 }
1102
1103 scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
1104 if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
1105 return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
1106 }
1107
1108 msg, err := scramClient.Step("")
1109 if err != nil {
1110 return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
1111
1112 }
1113
1114 for !scramClient.Done() {
1115 requestTime := time.Now()
1116 correlationID := b.correlationID
1117 bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
1118 if err != nil {
1119 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1120 return err
1121 }
1122
1123 b.updateOutgoingCommunicationMetrics(bytesWritten)
1124 b.correlationID++
1125 challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
1126 if err != nil {
1127 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1128 return err
1129 }
1130
1131 b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
1132 msg, err = scramClient.Step(string(challenge))
1133 if err != nil {
1134 Logger.Println("SASL authentication failed", err)
1135 return err
1136 }
1137 }
1138
1139 Logger.Println("SASL authentication succeeded")
1140 return nil
1141}
1142
1143func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
1144 rb := &SaslAuthenticateRequest{msg}
1145 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1146 buf, err := encode(req, b.conf.MetricRegistry)
1147 if err != nil {
1148 return 0, err
1149 }
1150
1151 return b.write(buf)
1152}
1153
1154func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
1155 buf := make([]byte, responseLengthSize+correlationIDSize)
1156 _, err := b.readFull(buf)
1157 if err != nil {
1158 return nil, err
1159 }
1160
1161 header := responseHeader{}
1162 err = decode(buf, &header)
1163 if err != nil {
1164 return nil, err
1165 }
1166
1167 if header.correlationID != correlationID {
1168 return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1169 }
1170
1171 buf = make([]byte, header.length-correlationIDSize)
1172 _, err = b.readFull(buf)
1173 if err != nil {
1174 return nil, err
1175 }
1176
1177 res := &SaslAuthenticateResponse{}
1178 if err := versionedDecode(buf, res, 0); err != nil {
1179 return nil, err
1180 }
1181 if res.Err != ErrNoError {
1182 return nil, res.Err
1183 }
1184 return res.SaslAuthBytes, nil
1185}
1186
1187// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
1188// https://tools.ietf.org/html/rfc7628
1189func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
1190 var ext string
1191
1192 if token.Extensions != nil && len(token.Extensions) > 0 {
1193 if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
1194 return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
1195 }
1196 ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
1197 }
1198
1199 resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
1200
1201 return resp, nil
1202}
1203
1204// mapToString returns a list of key-value pairs ordered by key.
1205// keyValSep separates the key from the value. elemSep separates each pair.
1206func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
1207 buf := make([]string, 0, len(extensions))
1208
1209 for k, v := range extensions {
1210 buf = append(buf, k+keyValSep+v)
1211 }
1212
1213 sort.Strings(buf)
1214
1215 return strings.Join(buf, elemSep)
1216}
1217
1218func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
1219 authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
1220 rb := &SaslAuthenticateRequest{authBytes}
1221 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1222 buf, err := encode(req, b.conf.MetricRegistry)
1223 if err != nil {
1224 return 0, err
1225 }
1226
1227 return b.write(buf)
1228}
1229
1230func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
1231
1232 rb := &SaslAuthenticateRequest{initialResp}
1233
1234 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1235
1236 buf, err := encode(req, b.conf.MetricRegistry)
1237 if err != nil {
1238 return 0, err
1239 }
1240
1241 return b.write(buf)
1242}
1243
1244func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
1245 buf := make([]byte, responseLengthSize+correlationIDSize)
1246 bytesRead, err := b.readFull(buf)
1247 if err != nil {
1248 return bytesRead, err
1249 }
1250
1251 header := responseHeader{}
1252 err = decode(buf, &header)
1253 if err != nil {
1254 return bytesRead, err
1255 }
1256
1257 if header.correlationID != correlationID {
1258 return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1259 }
1260
1261 buf = make([]byte, header.length-correlationIDSize)
1262 c, err := b.readFull(buf)
1263 bytesRead += c
1264 if err != nil {
1265 return bytesRead, err
1266 }
1267
1268 if err := versionedDecode(buf, res, 0); err != nil {
1269 return bytesRead, err
1270 }
1271
1272 if res.Err != ErrNoError {
1273 return bytesRead, res.Err
1274 }
1275
1276 return bytesRead, nil
1277}
1278
1279func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
1280 b.updateRequestLatencyMetrics(requestLatency)
1281 b.responseRate.Mark(1)
1282
1283 if b.brokerResponseRate != nil {
1284 b.brokerResponseRate.Mark(1)
1285 }
1286
1287 responseSize := int64(bytes)
1288 b.incomingByteRate.Mark(responseSize)
1289 if b.brokerIncomingByteRate != nil {
1290 b.brokerIncomingByteRate.Mark(responseSize)
1291 }
1292
1293 b.responseSize.Update(responseSize)
1294 if b.brokerResponseSize != nil {
1295 b.brokerResponseSize.Update(responseSize)
1296 }
1297}
1298
1299func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
1300 requestLatencyInMs := int64(requestLatency / time.Millisecond)
1301 b.requestLatency.Update(requestLatencyInMs)
1302
1303 if b.brokerRequestLatency != nil {
1304 b.brokerRequestLatency.Update(requestLatencyInMs)
1305 }
1306
1307}
1308
1309func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
1310 b.requestRate.Mark(1)
1311 if b.brokerRequestRate != nil {
1312 b.brokerRequestRate.Mark(1)
1313 }
1314
1315 requestSize := int64(bytes)
1316 b.outgoingByteRate.Mark(requestSize)
1317 if b.brokerOutgoingByteRate != nil {
1318 b.brokerOutgoingByteRate.Mark(requestSize)
1319 }
1320
1321 b.requestSize.Update(requestSize)
1322 if b.brokerRequestSize != nil {
1323 b.brokerRequestSize.Update(requestSize)
1324 }
1325
1326}
1327
1328func (b *Broker) registerMetrics() {
1329 b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
1330 b.brokerRequestRate = b.registerMeter("request-rate")
1331 b.brokerRequestSize = b.registerHistogram("request-size")
1332 b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
1333 b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
1334 b.brokerResponseRate = b.registerMeter("response-rate")
1335 b.brokerResponseSize = b.registerHistogram("response-size")
1336}
1337
1338func (b *Broker) unregisterMetrics() {
1339 for _, name := range b.registeredMetrics {
1340 b.conf.MetricRegistry.Unregister(name)
1341 }
1342}
1343
1344func (b *Broker) registerMeter(name string) metrics.Meter {
1345 nameForBroker := getMetricNameForBroker(name, b)
1346 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1347 return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
1348}
1349
1350func (b *Broker) registerHistogram(name string) metrics.Histogram {
1351 nameForBroker := getMetricNameForBroker(name, b)
1352 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1353 return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
1354}