blob: 9c3e5a04a8a7101ed264a53882ddfb8ac2fa01d0 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001package sarama
2
3import (
4 "crypto/tls"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "sort"
10 "strconv"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15
16 metrics "github.com/rcrowley/go-metrics"
17)
18
19// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
20type Broker struct {
21 conf *Config
22 rack *string
23
24 id int32
25 addr string
26 correlationID int32
27 conn net.Conn
28 connErr error
29 lock sync.Mutex
30 opened int32
31 responses chan responsePromise
32 done chan bool
33
34 registeredMetrics []string
35
36 incomingByteRate metrics.Meter
37 requestRate metrics.Meter
38 requestSize metrics.Histogram
39 requestLatency metrics.Histogram
40 outgoingByteRate metrics.Meter
41 responseRate metrics.Meter
42 responseSize metrics.Histogram
43 brokerIncomingByteRate metrics.Meter
44 brokerRequestRate metrics.Meter
45 brokerRequestSize metrics.Histogram
46 brokerRequestLatency metrics.Histogram
47 brokerOutgoingByteRate metrics.Meter
48 brokerResponseRate metrics.Meter
49 brokerResponseSize metrics.Histogram
50
51 kerberosAuthenticator GSSAPIKerberosAuth
52}
53
54// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
55type SASLMechanism string
56
57const (
58 // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
59 SASLTypeOAuth = "OAUTHBEARER"
60 // SASLTypePlaintext represents the SASL/PLAIN mechanism
61 SASLTypePlaintext = "PLAIN"
62 // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
63 SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
64 // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
65 SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
66 SASLTypeGSSAPI = "GSSAPI"
67 // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
68 // server negotiate SASL auth using opaque packets.
69 SASLHandshakeV0 = int16(0)
70 // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
71 // server negotiate SASL by wrapping tokens with Kafka protocol headers.
72 SASLHandshakeV1 = int16(1)
73 // SASLExtKeyAuth is the reserved extension key name sent as part of the
74 // SASL/OAUTHBEARER intial client response
75 SASLExtKeyAuth = "auth"
76)
77
78// AccessToken contains an access token used to authenticate a
79// SASL/OAUTHBEARER client along with associated metadata.
80type AccessToken struct {
81 // Token is the access token payload.
82 Token string
83 // Extensions is a optional map of arbitrary key-value pairs that can be
84 // sent with the SASL/OAUTHBEARER initial client response. These values are
85 // ignored by the SASL server if they are unexpected. This feature is only
86 // supported by Kafka >= 2.1.0.
87 Extensions map[string]string
88}
89
90// AccessTokenProvider is the interface that encapsulates how implementors
91// can generate access tokens for Kafka broker authentication.
92type AccessTokenProvider interface {
93 // Token returns an access token. The implementation should ensure token
94 // reuse so that multiple calls at connect time do not create multiple
95 // tokens. The implementation should also periodically refresh the token in
96 // order to guarantee that each call returns an unexpired token. This
97 // method should not block indefinitely--a timeout error should be returned
98 // after a short period of inactivity so that the broker connection logic
99 // can log debugging information and retry.
100 Token() (*AccessToken, error)
101}
102
103// SCRAMClient is a an interface to a SCRAM
104// client implementation.
105type SCRAMClient interface {
106 // Begin prepares the client for the SCRAM exchange
107 // with the server with a user name and a password
108 Begin(userName, password, authzID string) error
109 // Step steps client through the SCRAM exchange. It is
110 // called repeatedly until it errors or `Done` returns true.
111 Step(challenge string) (response string, err error)
112 // Done should return true when the SCRAM conversation
113 // is over.
114 Done() bool
115}
116
117type responsePromise struct {
118 requestTime time.Time
119 correlationID int32
120 packets chan []byte
121 errors chan error
122}
123
124// NewBroker creates and returns a Broker targeting the given host:port address.
125// This does not attempt to actually connect, you have to call Open() for that.
126func NewBroker(addr string) *Broker {
127 return &Broker{id: -1, addr: addr}
128}
129
130// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
131// waiting for the connection to complete. This means that any subsequent operations on the broker will
132// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
133// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
134// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
135func (b *Broker) Open(conf *Config) error {
136 if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
137 return ErrAlreadyConnected
138 }
139
140 if conf == nil {
141 conf = NewConfig()
142 }
143
144 err := conf.Validate()
145 if err != nil {
146 return err
147 }
148
149 b.lock.Lock()
150
151 go withRecover(func() {
152 defer b.lock.Unlock()
153
154 dialer := net.Dialer{
155 Timeout: conf.Net.DialTimeout,
156 KeepAlive: conf.Net.KeepAlive,
157 LocalAddr: conf.Net.LocalAddr,
158 }
159
160 if conf.Net.TLS.Enable {
161 b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
162 } else if conf.Net.Proxy.Enable {
163 b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
164 } else {
165 b.conn, b.connErr = dialer.Dial("tcp", b.addr)
166 }
167 if b.connErr != nil {
168 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
169 b.conn = nil
170 atomic.StoreInt32(&b.opened, 0)
171 return
172 }
173 b.conn = newBufConn(b.conn)
174
175 b.conf = conf
176
177 // Create or reuse the global metrics shared between brokers
178 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
179 b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
180 b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
181 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
182 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
183 b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
184 b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
185 // Do not gather metrics for seeded broker (only used during bootstrap) because they share
186 // the same id (-1) and are already exposed through the global metrics above
187 if b.id >= 0 {
188 b.registerMetrics()
189 }
190
191 if conf.Net.SASL.Enable {
192
193 b.connErr = b.authenticateViaSASL()
194
195 if b.connErr != nil {
196 err = b.conn.Close()
197 if err == nil {
198 Logger.Printf("Closed connection to broker %s\n", b.addr)
199 } else {
200 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
201 }
202 b.conn = nil
203 atomic.StoreInt32(&b.opened, 0)
204 return
205 }
206 }
207
208 b.done = make(chan bool)
209 b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
210
211 if b.id >= 0 {
212 Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
213 } else {
214 Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
215 }
216 go withRecover(b.responseReceiver)
217 })
218
219 return nil
220}
221
222// Connected returns true if the broker is connected and false otherwise. If the broker is not
223// connected but it had tried to connect, the error from that connection attempt is also returned.
224func (b *Broker) Connected() (bool, error) {
225 b.lock.Lock()
226 defer b.lock.Unlock()
227
228 return b.conn != nil, b.connErr
229}
230
231//Close closes the broker resources
232func (b *Broker) Close() error {
233 b.lock.Lock()
234 defer b.lock.Unlock()
235
236 if b.conn == nil {
237 return ErrNotConnected
238 }
239
240 close(b.responses)
241 <-b.done
242
243 err := b.conn.Close()
244
245 b.conn = nil
246 b.connErr = nil
247 b.done = nil
248 b.responses = nil
249
250 b.unregisterMetrics()
251
252 if err == nil {
253 Logger.Printf("Closed connection to broker %s\n", b.addr)
254 } else {
255 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
256 }
257
258 atomic.StoreInt32(&b.opened, 0)
259
260 return err
261}
262
263// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
264func (b *Broker) ID() int32 {
265 return b.id
266}
267
268// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
269func (b *Broker) Addr() string {
270 return b.addr
271}
272
273// Rack returns the broker's rack as retrieved from Kafka's metadata or the
274// empty string if it is not known. The returned value corresponds to the
275// broker's broker.rack configuration setting. Requires protocol version to be
276// at least v0.10.0.0.
277func (b *Broker) Rack() string {
278 if b.rack == nil {
279 return ""
280 }
281 return *b.rack
282}
283
284//GetMetadata send a metadata request and returns a metadata response or error
285func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
286 response := new(MetadataResponse)
287
288 err := b.sendAndReceive(request, response)
289
290 if err != nil {
291 return nil, err
292 }
293
294 return response, nil
295}
296
297//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
298func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
299 response := new(ConsumerMetadataResponse)
300
301 err := b.sendAndReceive(request, response)
302
303 if err != nil {
304 return nil, err
305 }
306
307 return response, nil
308}
309
310//FindCoordinator sends a find coordinate request and returns a response or error
311func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
312 response := new(FindCoordinatorResponse)
313
314 err := b.sendAndReceive(request, response)
315
316 if err != nil {
317 return nil, err
318 }
319
320 return response, nil
321}
322
323//GetAvailableOffsets return an offset response or error
324func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
325 response := new(OffsetResponse)
326
327 err := b.sendAndReceive(request, response)
328
329 if err != nil {
330 return nil, err
331 }
332
333 return response, nil
334}
335
336//Produce returns a produce response or error
337func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
338 var (
339 response *ProduceResponse
340 err error
341 )
342
343 if request.RequiredAcks == NoResponse {
344 err = b.sendAndReceive(request, nil)
345 } else {
346 response = new(ProduceResponse)
347 err = b.sendAndReceive(request, response)
348 }
349
350 if err != nil {
351 return nil, err
352 }
353
354 return response, nil
355}
356
357//Fetch returns a FetchResponse or error
358func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
359 response := new(FetchResponse)
360
361 err := b.sendAndReceive(request, response)
362 if err != nil {
363 return nil, err
364 }
365
366 return response, nil
367}
368
369//CommitOffset return an Offset commit reponse or error
370func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
371 response := new(OffsetCommitResponse)
372
373 err := b.sendAndReceive(request, response)
374 if err != nil {
375 return nil, err
376 }
377
378 return response, nil
379}
380
381//FetchOffset returns an offset fetch response or error
382func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
383 response := new(OffsetFetchResponse)
384
385 err := b.sendAndReceive(request, response)
386 if err != nil {
387 return nil, err
388 }
389
390 return response, nil
391}
392
393//JoinGroup returns a join group response or error
394func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
395 response := new(JoinGroupResponse)
396
397 err := b.sendAndReceive(request, response)
398 if err != nil {
399 return nil, err
400 }
401
402 return response, nil
403}
404
405//SyncGroup returns a sync group response or error
406func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
407 response := new(SyncGroupResponse)
408
409 err := b.sendAndReceive(request, response)
410 if err != nil {
411 return nil, err
412 }
413
414 return response, nil
415}
416
417//LeaveGroup return a leave group response or error
418func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
419 response := new(LeaveGroupResponse)
420
421 err := b.sendAndReceive(request, response)
422 if err != nil {
423 return nil, err
424 }
425
426 return response, nil
427}
428
429//Heartbeat returns a heartbeat response or error
430func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
431 response := new(HeartbeatResponse)
432
433 err := b.sendAndReceive(request, response)
434 if err != nil {
435 return nil, err
436 }
437
438 return response, nil
439}
440
441//ListGroups return a list group response or error
442func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
443 response := new(ListGroupsResponse)
444
445 err := b.sendAndReceive(request, response)
446 if err != nil {
447 return nil, err
448 }
449
450 return response, nil
451}
452
453//DescribeGroups return describe group response or error
454func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
455 response := new(DescribeGroupsResponse)
456
457 err := b.sendAndReceive(request, response)
458 if err != nil {
459 return nil, err
460 }
461
462 return response, nil
463}
464
465//ApiVersions return api version response or error
466func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
467 response := new(ApiVersionsResponse)
468
469 err := b.sendAndReceive(request, response)
470 if err != nil {
471 return nil, err
472 }
473
474 return response, nil
475}
476
477//CreateTopics send a create topic request and returns create topic response
478func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
479 response := new(CreateTopicsResponse)
480
481 err := b.sendAndReceive(request, response)
482 if err != nil {
483 return nil, err
484 }
485
486 return response, nil
487}
488
489//DeleteTopics sends a delete topic request and returns delete topic response
490func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
491 response := new(DeleteTopicsResponse)
492
493 err := b.sendAndReceive(request, response)
494 if err != nil {
495 return nil, err
496 }
497
498 return response, nil
499}
500
501//CreatePartitions sends a create partition request and returns create
502//partitions response or error
503func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
504 response := new(CreatePartitionsResponse)
505
506 err := b.sendAndReceive(request, response)
507 if err != nil {
508 return nil, err
509 }
510
511 return response, nil
512}
513
514//DeleteRecords send a request to delete records and return delete record
515//response or error
516func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
517 response := new(DeleteRecordsResponse)
518
519 err := b.sendAndReceive(request, response)
520 if err != nil {
521 return nil, err
522 }
523
524 return response, nil
525}
526
527//DescribeAcls sends a describe acl request and returns a response or error
528func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
529 response := new(DescribeAclsResponse)
530
531 err := b.sendAndReceive(request, response)
532 if err != nil {
533 return nil, err
534 }
535
536 return response, nil
537}
538
539//CreateAcls sends a create acl request and returns a response or error
540func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
541 response := new(CreateAclsResponse)
542
543 err := b.sendAndReceive(request, response)
544 if err != nil {
545 return nil, err
546 }
547
548 return response, nil
549}
550
551//DeleteAcls sends a delete acl request and returns a response or error
552func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
553 response := new(DeleteAclsResponse)
554
555 err := b.sendAndReceive(request, response)
556 if err != nil {
557 return nil, err
558 }
559
560 return response, nil
561}
562
563//InitProducerID sends an init producer request and returns a response or error
564func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
565 response := new(InitProducerIDResponse)
566
567 err := b.sendAndReceive(request, response)
568 if err != nil {
569 return nil, err
570 }
571
572 return response, nil
573}
574
575//AddPartitionsToTxn send a request to add partition to txn and returns
576//a response or error
577func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
578 response := new(AddPartitionsToTxnResponse)
579
580 err := b.sendAndReceive(request, response)
581 if err != nil {
582 return nil, err
583 }
584
585 return response, nil
586}
587
588//AddOffsetsToTxn sends a request to add offsets to txn and returns a response
589//or error
590func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
591 response := new(AddOffsetsToTxnResponse)
592
593 err := b.sendAndReceive(request, response)
594 if err != nil {
595 return nil, err
596 }
597
598 return response, nil
599}
600
601//EndTxn sends a request to end txn and returns a response or error
602func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
603 response := new(EndTxnResponse)
604
605 err := b.sendAndReceive(request, response)
606 if err != nil {
607 return nil, err
608 }
609
610 return response, nil
611}
612
613//TxnOffsetCommit sends a request to commit transaction offsets and returns
614//a response or error
615func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
616 response := new(TxnOffsetCommitResponse)
617
618 err := b.sendAndReceive(request, response)
619 if err != nil {
620 return nil, err
621 }
622
623 return response, nil
624}
625
626//DescribeConfigs sends a request to describe config and returns a response or
627//error
628func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
629 response := new(DescribeConfigsResponse)
630
631 err := b.sendAndReceive(request, response)
632 if err != nil {
633 return nil, err
634 }
635
636 return response, nil
637}
638
639//AlterConfigs sends a request to alter config and return a response or error
640func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
641 response := new(AlterConfigsResponse)
642
643 err := b.sendAndReceive(request, response)
644 if err != nil {
645 return nil, err
646 }
647
648 return response, nil
649}
650
651//DeleteGroups sends a request to delete groups and returns a response or error
652func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
653 response := new(DeleteGroupsResponse)
654
655 if err := b.sendAndReceive(request, response); err != nil {
656 return nil, err
657 }
658
659 return response, nil
660}
661
662func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
663 b.lock.Lock()
664 defer b.lock.Unlock()
665
666 if b.conn == nil {
667 if b.connErr != nil {
668 return nil, b.connErr
669 }
670 return nil, ErrNotConnected
671 }
672
673 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
674 return nil, ErrUnsupportedVersion
675 }
676
677 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
678 buf, err := encode(req, b.conf.MetricRegistry)
679 if err != nil {
680 return nil, err
681 }
682
683 err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
684 if err != nil {
685 return nil, err
686 }
687
688 requestTime := time.Now()
689 bytes, err := b.conn.Write(buf)
690 b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
691 if err != nil {
692 return nil, err
693 }
694 b.correlationID++
695
696 if !promiseResponse {
697 // Record request latency without the response
698 b.updateRequestLatencyMetrics(time.Since(requestTime))
699 return nil, nil
700 }
701
702 promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
703 b.responses <- promise
704
705 return &promise, nil
706}
707
708func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
709 promise, err := b.send(req, res != nil)
710 if err != nil {
711 return err
712 }
713
714 if promise == nil {
715 return nil
716 }
717
718 select {
719 case buf := <-promise.packets:
720 return versionedDecode(buf, res, req.version())
721 case err = <-promise.errors:
722 return err
723 }
724}
725
726func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
727 b.id, err = pd.getInt32()
728 if err != nil {
729 return err
730 }
731
732 host, err := pd.getString()
733 if err != nil {
734 return err
735 }
736
737 port, err := pd.getInt32()
738 if err != nil {
739 return err
740 }
741
742 if version >= 1 {
743 b.rack, err = pd.getNullableString()
744 if err != nil {
745 return err
746 }
747 }
748
749 b.addr = net.JoinHostPort(host, fmt.Sprint(port))
750 if _, _, err := net.SplitHostPort(b.addr); err != nil {
751 return err
752 }
753
754 return nil
755}
756
757func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
758 host, portstr, err := net.SplitHostPort(b.addr)
759 if err != nil {
760 return err
761 }
762
763 port, err := strconv.Atoi(portstr)
764 if err != nil {
765 return err
766 }
767
768 pe.putInt32(b.id)
769
770 err = pe.putString(host)
771 if err != nil {
772 return err
773 }
774
775 pe.putInt32(int32(port))
776
777 if version >= 1 {
778 err = pe.putNullableString(b.rack)
779 if err != nil {
780 return err
781 }
782 }
783
784 return nil
785}
786
787func (b *Broker) responseReceiver() {
788 var dead error
789 header := make([]byte, 8)
790
791 for response := range b.responses {
792 if dead != nil {
793 response.errors <- dead
794 continue
795 }
796
797 err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
798 if err != nil {
799 dead = err
800 response.errors <- err
801 continue
802 }
803
804 bytesReadHeader, err := io.ReadFull(b.conn, header)
805 requestLatency := time.Since(response.requestTime)
806 if err != nil {
807 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
808 dead = err
809 response.errors <- err
810 continue
811 }
812
813 decodedHeader := responseHeader{}
814 err = decode(header, &decodedHeader)
815 if err != nil {
816 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
817 dead = err
818 response.errors <- err
819 continue
820 }
821 if decodedHeader.correlationID != response.correlationID {
822 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
823 // TODO if decoded ID < cur ID, discard until we catch up
824 // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
825 dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
826 response.errors <- dead
827 continue
828 }
829
830 buf := make([]byte, decodedHeader.length-4)
831 bytesReadBody, err := io.ReadFull(b.conn, buf)
832 b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
833 if err != nil {
834 dead = err
835 response.errors <- err
836 continue
837 }
838
839 response.packets <- buf
840 }
841 close(b.done)
842}
843
844func (b *Broker) authenticateViaSASL() error {
845 switch b.conf.Net.SASL.Mechanism {
846 case SASLTypeOAuth:
847 return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
848 case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
849 return b.sendAndReceiveSASLSCRAMv1()
850 case SASLTypeGSSAPI:
851 return b.sendAndReceiveKerberos()
852 default:
853 return b.sendAndReceiveSASLPlainAuth()
854 }
855}
856
857func (b *Broker) sendAndReceiveKerberos() error {
858 b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
859 if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
860 b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
861 }
862 return b.kerberosAuthenticator.Authorize(b)
863}
864
865func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
866 rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
867
868 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
869 buf, err := encode(req, b.conf.MetricRegistry)
870 if err != nil {
871 return err
872 }
873
874 err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
875 if err != nil {
876 return err
877 }
878
879 requestTime := time.Now()
880 bytes, err := b.conn.Write(buf)
881 b.updateOutgoingCommunicationMetrics(bytes)
882 if err != nil {
883 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
884 return err
885 }
886 b.correlationID++
887 //wait for the response
888 header := make([]byte, 8) // response header
889 _, err = io.ReadFull(b.conn, header)
890 if err != nil {
891 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
892 return err
893 }
894
895 length := binary.BigEndian.Uint32(header[:4])
896 payload := make([]byte, length-4)
897 n, err := io.ReadFull(b.conn, payload)
898 if err != nil {
899 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
900 return err
901 }
902
903 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
904 res := &SaslHandshakeResponse{}
905
906 err = versionedDecode(payload, res, 0)
907 if err != nil {
908 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
909 return err
910 }
911
912 if res.Err != ErrNoError {
913 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
914 return res.Err
915 }
916
917 Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
918 return nil
919}
920
921// Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
922// Kafka 1.x.x onward added a SaslAuthenticate request/response message which
923// wraps the SASL flow in the Kafka protocol, which allows for returning
924// meaningful errors on authentication failure.
925//
926// In SASL Plain, Kafka expects the auth header to be in the following format
927// Message format (from https://tools.ietf.org/html/rfc4616):
928//
929// message = [authzid] UTF8NUL authcid UTF8NUL passwd
930// authcid = 1*SAFE ; MUST accept up to 255 octets
931// authzid = 1*SAFE ; MUST accept up to 255 octets
932// passwd = 1*SAFE ; MUST accept up to 255 octets
933// UTF8NUL = %x00 ; UTF-8 encoded NUL character
934//
935// SAFE = UTF1 / UTF2 / UTF3 / UTF4
936// ;; any UTF-8 encoded Unicode character except NUL
937//
938// With SASL v0 handshake and auth then:
939// When credentials are valid, Kafka returns a 4 byte array of null characters.
940// When credentials are invalid, Kafka closes the connection.
941//
942// With SASL v1 handshake and auth then:
943// When credentials are invalid, Kafka replies with a SaslAuthenticate response
944// containing an error code and message detailing the authentication failure.
945func (b *Broker) sendAndReceiveSASLPlainAuth() error {
946 // default to V0 to allow for backward compatability when SASL is enabled
947 // but not the handshake
948 if b.conf.Net.SASL.Handshake {
949
950 handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
951 if handshakeErr != nil {
952 Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
953 return handshakeErr
954 }
955 }
956
957 if b.conf.Net.SASL.Version == SASLHandshakeV1 {
958 return b.sendAndReceiveV1SASLPlainAuth()
959 }
960 return b.sendAndReceiveV0SASLPlainAuth()
961}
962
963// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
964func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
965
966 length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
967 authBytes := make([]byte, length+4) //4 byte length header + auth data
968 binary.BigEndian.PutUint32(authBytes, uint32(length))
969 copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
970
971 err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
972 if err != nil {
973 Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
974 return err
975 }
976
977 requestTime := time.Now()
978 bytesWritten, err := b.conn.Write(authBytes)
979 b.updateOutgoingCommunicationMetrics(bytesWritten)
980 if err != nil {
981 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
982 return err
983 }
984
985 header := make([]byte, 4)
986 n, err := io.ReadFull(b.conn, header)
987 b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
988 // If the credentials are valid, we would get a 4 byte response filled with null characters.
989 // Otherwise, the broker closes the connection and we get an EOF
990 if err != nil {
991 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
992 return err
993 }
994
995 Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
996 return nil
997}
998
999// sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
1000func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
1001 correlationID := b.correlationID
1002
1003 requestTime := time.Now()
1004
1005 bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
1006
1007 b.updateOutgoingCommunicationMetrics(bytesWritten)
1008
1009 if err != nil {
1010 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1011 return err
1012 }
1013
1014 b.correlationID++
1015
1016 bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
1017 b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
1018
1019 // With v1 sasl we get an error message set in the response we can return
1020 if err != nil {
1021 Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
1022 return err
1023 }
1024
1025 return nil
1026}
1027
1028// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
1029// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
1030func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
1031 if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
1032 return err
1033 }
1034
1035 token, err := provider.Token()
1036 if err != nil {
1037 return err
1038 }
1039
1040 message, err := buildClientFirstMessage(token)
1041 if err != nil {
1042 return err
1043 }
1044
1045 challenged, err := b.sendClientMessage(message)
1046 if err != nil {
1047 return err
1048 }
1049
1050 if challenged {
1051 // Abort the token exchange. The broker returns the failure code.
1052 _, err = b.sendClientMessage([]byte(`\x01`))
1053 }
1054
1055 return err
1056}
1057
1058// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
1059// if the broker responds with a challenge, in which case the token is
1060// rejected.
1061func (b *Broker) sendClientMessage(message []byte) (bool, error) {
1062
1063 requestTime := time.Now()
1064 correlationID := b.correlationID
1065
1066 bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
1067 if err != nil {
1068 return false, err
1069 }
1070
1071 b.updateOutgoingCommunicationMetrics(bytesWritten)
1072 b.correlationID++
1073
1074 res := &SaslAuthenticateResponse{}
1075 bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
1076
1077 requestLatency := time.Since(requestTime)
1078 b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
1079
1080 isChallenge := len(res.SaslAuthBytes) > 0
1081
1082 if isChallenge && err != nil {
1083 Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
1084 }
1085
1086 return isChallenge, err
1087}
1088
1089func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
1090 if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
1091 return err
1092 }
1093
1094 scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
1095 if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
1096 return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
1097 }
1098
1099 msg, err := scramClient.Step("")
1100 if err != nil {
1101 return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
1102
1103 }
1104
1105 for !scramClient.Done() {
1106 requestTime := time.Now()
1107 correlationID := b.correlationID
1108 bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
1109 if err != nil {
1110 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1111 return err
1112 }
1113
1114 b.updateOutgoingCommunicationMetrics(bytesWritten)
1115 b.correlationID++
1116 challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
1117 if err != nil {
1118 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1119 return err
1120 }
1121
1122 b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
1123 msg, err = scramClient.Step(string(challenge))
1124 if err != nil {
1125 Logger.Println("SASL authentication failed", err)
1126 return err
1127 }
1128 }
1129
1130 Logger.Println("SASL authentication succeeded")
1131 return nil
1132}
1133
1134func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
1135 rb := &SaslAuthenticateRequest{msg}
1136 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1137 buf, err := encode(req, b.conf.MetricRegistry)
1138 if err != nil {
1139 return 0, err
1140 }
1141
1142 if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
1143 return 0, err
1144 }
1145
1146 return b.conn.Write(buf)
1147}
1148
1149func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
1150 buf := make([]byte, responseLengthSize+correlationIDSize)
1151 _, err := io.ReadFull(b.conn, buf)
1152 if err != nil {
1153 return nil, err
1154 }
1155
1156 header := responseHeader{}
1157 err = decode(buf, &header)
1158 if err != nil {
1159 return nil, err
1160 }
1161
1162 if header.correlationID != correlationID {
1163 return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1164 }
1165
1166 buf = make([]byte, header.length-correlationIDSize)
1167 _, err = io.ReadFull(b.conn, buf)
1168 if err != nil {
1169 return nil, err
1170 }
1171
1172 res := &SaslAuthenticateResponse{}
1173 if err := versionedDecode(buf, res, 0); err != nil {
1174 return nil, err
1175 }
1176 if res.Err != ErrNoError {
1177 return nil, res.Err
1178 }
1179 return res.SaslAuthBytes, nil
1180}
1181
1182// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
1183// https://tools.ietf.org/html/rfc7628
1184func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
1185 var ext string
1186
1187 if token.Extensions != nil && len(token.Extensions) > 0 {
1188 if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
1189 return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
1190 }
1191 ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
1192 }
1193
1194 resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
1195
1196 return resp, nil
1197}
1198
1199// mapToString returns a list of key-value pairs ordered by key.
1200// keyValSep separates the key from the value. elemSep separates each pair.
1201func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
1202 buf := make([]string, 0, len(extensions))
1203
1204 for k, v := range extensions {
1205 buf = append(buf, k+keyValSep+v)
1206 }
1207
1208 sort.Strings(buf)
1209
1210 return strings.Join(buf, elemSep)
1211}
1212
1213func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
1214 authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
1215 rb := &SaslAuthenticateRequest{authBytes}
1216 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1217 buf, err := encode(req, b.conf.MetricRegistry)
1218 if err != nil {
1219 return 0, err
1220 }
1221
1222 err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
1223 if err != nil {
1224 Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
1225 return 0, err
1226 }
1227 return b.conn.Write(buf)
1228}
1229
1230func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
1231
1232 rb := &SaslAuthenticateRequest{initialResp}
1233
1234 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1235
1236 buf, err := encode(req, b.conf.MetricRegistry)
1237 if err != nil {
1238 return 0, err
1239 }
1240
1241 if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
1242 return 0, err
1243 }
1244
1245 return b.conn.Write(buf)
1246}
1247
1248func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
1249
1250 buf := make([]byte, responseLengthSize+correlationIDSize)
1251
1252 bytesRead, err := io.ReadFull(b.conn, buf)
1253 if err != nil {
1254 return bytesRead, err
1255 }
1256
1257 header := responseHeader{}
1258
1259 err = decode(buf, &header)
1260 if err != nil {
1261 return bytesRead, err
1262 }
1263
1264 if header.correlationID != correlationID {
1265 return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1266 }
1267
1268 buf = make([]byte, header.length-correlationIDSize)
1269
1270 c, err := io.ReadFull(b.conn, buf)
1271 bytesRead += c
1272 if err != nil {
1273 return bytesRead, err
1274 }
1275
1276 if err := versionedDecode(buf, res, 0); err != nil {
1277 return bytesRead, err
1278 }
1279
1280 if res.Err != ErrNoError {
1281 return bytesRead, res.Err
1282 }
1283
1284 return bytesRead, nil
1285}
1286
1287func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
1288 b.updateRequestLatencyMetrics(requestLatency)
1289 b.responseRate.Mark(1)
1290
1291 if b.brokerResponseRate != nil {
1292 b.brokerResponseRate.Mark(1)
1293 }
1294
1295 responseSize := int64(bytes)
1296 b.incomingByteRate.Mark(responseSize)
1297 if b.brokerIncomingByteRate != nil {
1298 b.brokerIncomingByteRate.Mark(responseSize)
1299 }
1300
1301 b.responseSize.Update(responseSize)
1302 if b.brokerResponseSize != nil {
1303 b.brokerResponseSize.Update(responseSize)
1304 }
1305}
1306
1307func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
1308 requestLatencyInMs := int64(requestLatency / time.Millisecond)
1309 b.requestLatency.Update(requestLatencyInMs)
1310
1311 if b.brokerRequestLatency != nil {
1312 b.brokerRequestLatency.Update(requestLatencyInMs)
1313 }
1314
1315}
1316
1317func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
1318 b.requestRate.Mark(1)
1319 if b.brokerRequestRate != nil {
1320 b.brokerRequestRate.Mark(1)
1321 }
1322
1323 requestSize := int64(bytes)
1324 b.outgoingByteRate.Mark(requestSize)
1325 if b.brokerOutgoingByteRate != nil {
1326 b.brokerOutgoingByteRate.Mark(requestSize)
1327 }
1328
1329 b.requestSize.Update(requestSize)
1330 if b.brokerRequestSize != nil {
1331 b.brokerRequestSize.Update(requestSize)
1332 }
1333
1334}
1335
1336func (b *Broker) registerMetrics() {
1337 b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
1338 b.brokerRequestRate = b.registerMeter("request-rate")
1339 b.brokerRequestSize = b.registerHistogram("request-size")
1340 b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
1341 b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
1342 b.brokerResponseRate = b.registerMeter("response-rate")
1343 b.brokerResponseSize = b.registerHistogram("response-size")
1344}
1345
1346func (b *Broker) unregisterMetrics() {
1347 for _, name := range b.registeredMetrics {
1348 b.conf.MetricRegistry.Unregister(name)
1349 }
1350}
1351
1352func (b *Broker) registerMeter(name string) metrics.Meter {
1353 nameForBroker := getMetricNameForBroker(name, b)
1354 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1355 return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
1356}
1357
1358func (b *Broker) registerHistogram(name string) metrics.Histogram {
1359 nameForBroker := getMetricNameForBroker(name, b)
1360 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1361 return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
1362}