blob: d27ebd22dc996a5a4c57eaab2a66acf932718a28 [file] [log] [blame]
Pragya Arya324337e2020-02-20 14:35:08 +05301package sarama
2
3import (
4 "crypto/tls"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "sort"
10 "strconv"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15
16 metrics "github.com/rcrowley/go-metrics"
17)
18
19// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
20type Broker struct {
21 conf *Config
22 rack *string
23
24 id int32
25 addr string
26 correlationID int32
27 conn net.Conn
28 connErr error
29 lock sync.Mutex
30 opened int32
31 responses chan responsePromise
32 done chan bool
33
34 registeredMetrics []string
35
36 incomingByteRate metrics.Meter
37 requestRate metrics.Meter
38 requestSize metrics.Histogram
39 requestLatency metrics.Histogram
40 outgoingByteRate metrics.Meter
41 responseRate metrics.Meter
42 responseSize metrics.Histogram
43 requestsInFlight metrics.Counter
44 brokerIncomingByteRate metrics.Meter
45 brokerRequestRate metrics.Meter
46 brokerRequestSize metrics.Histogram
47 brokerRequestLatency metrics.Histogram
48 brokerOutgoingByteRate metrics.Meter
49 brokerResponseRate metrics.Meter
50 brokerResponseSize metrics.Histogram
51 brokerRequestsInFlight metrics.Counter
52
53 kerberosAuthenticator GSSAPIKerberosAuth
54}
55
56// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
57type SASLMechanism string
58
59const (
60 // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
61 SASLTypeOAuth = "OAUTHBEARER"
62 // SASLTypePlaintext represents the SASL/PLAIN mechanism
63 SASLTypePlaintext = "PLAIN"
64 // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
65 SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
66 // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
67 SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
68 SASLTypeGSSAPI = "GSSAPI"
69 // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
70 // server negotiate SASL auth using opaque packets.
71 SASLHandshakeV0 = int16(0)
72 // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
73 // server negotiate SASL by wrapping tokens with Kafka protocol headers.
74 SASLHandshakeV1 = int16(1)
75 // SASLExtKeyAuth is the reserved extension key name sent as part of the
76 // SASL/OAUTHBEARER intial client response
77 SASLExtKeyAuth = "auth"
78)
79
80// AccessToken contains an access token used to authenticate a
81// SASL/OAUTHBEARER client along with associated metadata.
82type AccessToken struct {
83 // Token is the access token payload.
84 Token string
85 // Extensions is a optional map of arbitrary key-value pairs that can be
86 // sent with the SASL/OAUTHBEARER initial client response. These values are
87 // ignored by the SASL server if they are unexpected. This feature is only
88 // supported by Kafka >= 2.1.0.
89 Extensions map[string]string
90}
91
92// AccessTokenProvider is the interface that encapsulates how implementors
93// can generate access tokens for Kafka broker authentication.
94type AccessTokenProvider interface {
95 // Token returns an access token. The implementation should ensure token
96 // reuse so that multiple calls at connect time do not create multiple
97 // tokens. The implementation should also periodically refresh the token in
98 // order to guarantee that each call returns an unexpired token. This
99 // method should not block indefinitely--a timeout error should be returned
100 // after a short period of inactivity so that the broker connection logic
101 // can log debugging information and retry.
102 Token() (*AccessToken, error)
103}
104
105// SCRAMClient is a an interface to a SCRAM
106// client implementation.
107type SCRAMClient interface {
108 // Begin prepares the client for the SCRAM exchange
109 // with the server with a user name and a password
110 Begin(userName, password, authzID string) error
111 // Step steps client through the SCRAM exchange. It is
112 // called repeatedly until it errors or `Done` returns true.
113 Step(challenge string) (response string, err error)
114 // Done should return true when the SCRAM conversation
115 // is over.
116 Done() bool
117}
118
119type responsePromise struct {
120 requestTime time.Time
121 correlationID int32
122 packets chan []byte
123 errors chan error
124}
125
126// NewBroker creates and returns a Broker targeting the given host:port address.
127// This does not attempt to actually connect, you have to call Open() for that.
128func NewBroker(addr string) *Broker {
129 return &Broker{id: -1, addr: addr}
130}
131
132// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
133// waiting for the connection to complete. This means that any subsequent operations on the broker will
134// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
135// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
136// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
137func (b *Broker) Open(conf *Config) error {
138 if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
139 return ErrAlreadyConnected
140 }
141
142 if conf == nil {
143 conf = NewConfig()
144 }
145
146 err := conf.Validate()
147 if err != nil {
148 return err
149 }
150
151 b.lock.Lock()
152
153 go withRecover(func() {
154 defer b.lock.Unlock()
155
156 dialer := net.Dialer{
157 Timeout: conf.Net.DialTimeout,
158 KeepAlive: conf.Net.KeepAlive,
159 LocalAddr: conf.Net.LocalAddr,
160 }
161
162 if conf.Net.TLS.Enable {
163 b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
164 } else if conf.Net.Proxy.Enable {
165 b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
166 } else {
167 b.conn, b.connErr = dialer.Dial("tcp", b.addr)
168 }
169 if b.connErr != nil {
170 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
171 b.conn = nil
172 atomic.StoreInt32(&b.opened, 0)
173 return
174 }
175 b.conn = newBufConn(b.conn)
176
177 b.conf = conf
178
179 // Create or reuse the global metrics shared between brokers
180 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
181 b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
182 b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
183 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
184 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
185 b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
186 b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
187 b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
188 // Do not gather metrics for seeded broker (only used during bootstrap) because they share
189 // the same id (-1) and are already exposed through the global metrics above
190 if b.id >= 0 {
191 b.registerMetrics()
192 }
193
194 if conf.Net.SASL.Enable {
195 b.connErr = b.authenticateViaSASL()
196
197 if b.connErr != nil {
198 err = b.conn.Close()
199 if err == nil {
200 Logger.Printf("Closed connection to broker %s\n", b.addr)
201 } else {
202 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
203 }
204 b.conn = nil
205 atomic.StoreInt32(&b.opened, 0)
206 return
207 }
208 }
209
210 b.done = make(chan bool)
211 b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
212
213 if b.id >= 0 {
214 Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
215 } else {
216 Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
217 }
218 go withRecover(b.responseReceiver)
219 })
220
221 return nil
222}
223
224// Connected returns true if the broker is connected and false otherwise. If the broker is not
225// connected but it had tried to connect, the error from that connection attempt is also returned.
226func (b *Broker) Connected() (bool, error) {
227 b.lock.Lock()
228 defer b.lock.Unlock()
229
230 return b.conn != nil, b.connErr
231}
232
233//Close closes the broker resources
234func (b *Broker) Close() error {
235 b.lock.Lock()
236 defer b.lock.Unlock()
237
238 if b.conn == nil {
239 return ErrNotConnected
240 }
241
242 close(b.responses)
243 <-b.done
244
245 err := b.conn.Close()
246
247 b.conn = nil
248 b.connErr = nil
249 b.done = nil
250 b.responses = nil
251
252 b.unregisterMetrics()
253
254 if err == nil {
255 Logger.Printf("Closed connection to broker %s\n", b.addr)
256 } else {
257 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
258 }
259
260 atomic.StoreInt32(&b.opened, 0)
261
262 return err
263}
264
265// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
266func (b *Broker) ID() int32 {
267 return b.id
268}
269
270// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
271func (b *Broker) Addr() string {
272 return b.addr
273}
274
275// Rack returns the broker's rack as retrieved from Kafka's metadata or the
276// empty string if it is not known. The returned value corresponds to the
277// broker's broker.rack configuration setting. Requires protocol version to be
278// at least v0.10.0.0.
279func (b *Broker) Rack() string {
280 if b.rack == nil {
281 return ""
282 }
283 return *b.rack
284}
285
286//GetMetadata send a metadata request and returns a metadata response or error
287func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
288 response := new(MetadataResponse)
289
290 err := b.sendAndReceive(request, response)
291
292 if err != nil {
293 return nil, err
294 }
295
296 return response, nil
297}
298
299//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
300func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
301 response := new(ConsumerMetadataResponse)
302
303 err := b.sendAndReceive(request, response)
304
305 if err != nil {
306 return nil, err
307 }
308
309 return response, nil
310}
311
312//FindCoordinator sends a find coordinate request and returns a response or error
313func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
314 response := new(FindCoordinatorResponse)
315
316 err := b.sendAndReceive(request, response)
317
318 if err != nil {
319 return nil, err
320 }
321
322 return response, nil
323}
324
325//GetAvailableOffsets return an offset response or error
326func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
327 response := new(OffsetResponse)
328
329 err := b.sendAndReceive(request, response)
330
331 if err != nil {
332 return nil, err
333 }
334
335 return response, nil
336}
337
338//Produce returns a produce response or error
339func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
340 var (
341 response *ProduceResponse
342 err error
343 )
344
345 if request.RequiredAcks == NoResponse {
346 err = b.sendAndReceive(request, nil)
347 } else {
348 response = new(ProduceResponse)
349 err = b.sendAndReceive(request, response)
350 }
351
352 if err != nil {
353 return nil, err
354 }
355
356 return response, nil
357}
358
359//Fetch returns a FetchResponse or error
360func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
361 response := new(FetchResponse)
362
363 err := b.sendAndReceive(request, response)
364 if err != nil {
365 return nil, err
366 }
367
368 return response, nil
369}
370
371//CommitOffset return an Offset commit reponse or error
372func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
373 response := new(OffsetCommitResponse)
374
375 err := b.sendAndReceive(request, response)
376 if err != nil {
377 return nil, err
378 }
379
380 return response, nil
381}
382
383//FetchOffset returns an offset fetch response or error
384func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
385 response := new(OffsetFetchResponse)
386
387 err := b.sendAndReceive(request, response)
388 if err != nil {
389 return nil, err
390 }
391
392 return response, nil
393}
394
395//JoinGroup returns a join group response or error
396func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
397 response := new(JoinGroupResponse)
398
399 err := b.sendAndReceive(request, response)
400 if err != nil {
401 return nil, err
402 }
403
404 return response, nil
405}
406
407//SyncGroup returns a sync group response or error
408func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
409 response := new(SyncGroupResponse)
410
411 err := b.sendAndReceive(request, response)
412 if err != nil {
413 return nil, err
414 }
415
416 return response, nil
417}
418
419//LeaveGroup return a leave group response or error
420func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
421 response := new(LeaveGroupResponse)
422
423 err := b.sendAndReceive(request, response)
424 if err != nil {
425 return nil, err
426 }
427
428 return response, nil
429}
430
431//Heartbeat returns a heartbeat response or error
432func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
433 response := new(HeartbeatResponse)
434
435 err := b.sendAndReceive(request, response)
436 if err != nil {
437 return nil, err
438 }
439
440 return response, nil
441}
442
443//ListGroups return a list group response or error
444func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
445 response := new(ListGroupsResponse)
446
447 err := b.sendAndReceive(request, response)
448 if err != nil {
449 return nil, err
450 }
451
452 return response, nil
453}
454
455//DescribeGroups return describe group response or error
456func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
457 response := new(DescribeGroupsResponse)
458
459 err := b.sendAndReceive(request, response)
460 if err != nil {
461 return nil, err
462 }
463
464 return response, nil
465}
466
467//ApiVersions return api version response or error
468func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
469 response := new(ApiVersionsResponse)
470
471 err := b.sendAndReceive(request, response)
472 if err != nil {
473 return nil, err
474 }
475
476 return response, nil
477}
478
479//CreateTopics send a create topic request and returns create topic response
480func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
481 response := new(CreateTopicsResponse)
482
483 err := b.sendAndReceive(request, response)
484 if err != nil {
485 return nil, err
486 }
487
488 return response, nil
489}
490
491//DeleteTopics sends a delete topic request and returns delete topic response
492func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
493 response := new(DeleteTopicsResponse)
494
495 err := b.sendAndReceive(request, response)
496 if err != nil {
497 return nil, err
498 }
499
500 return response, nil
501}
502
503//CreatePartitions sends a create partition request and returns create
504//partitions response or error
505func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
506 response := new(CreatePartitionsResponse)
507
508 err := b.sendAndReceive(request, response)
509 if err != nil {
510 return nil, err
511 }
512
513 return response, nil
514}
515
516//DeleteRecords send a request to delete records and return delete record
517//response or error
518func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
519 response := new(DeleteRecordsResponse)
520
521 err := b.sendAndReceive(request, response)
522 if err != nil {
523 return nil, err
524 }
525
526 return response, nil
527}
528
529//DescribeAcls sends a describe acl request and returns a response or error
530func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
531 response := new(DescribeAclsResponse)
532
533 err := b.sendAndReceive(request, response)
534 if err != nil {
535 return nil, err
536 }
537
538 return response, nil
539}
540
541//CreateAcls sends a create acl request and returns a response or error
542func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
543 response := new(CreateAclsResponse)
544
545 err := b.sendAndReceive(request, response)
546 if err != nil {
547 return nil, err
548 }
549
550 return response, nil
551}
552
553//DeleteAcls sends a delete acl request and returns a response or error
554func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
555 response := new(DeleteAclsResponse)
556
557 err := b.sendAndReceive(request, response)
558 if err != nil {
559 return nil, err
560 }
561
562 return response, nil
563}
564
565//InitProducerID sends an init producer request and returns a response or error
566func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
567 response := new(InitProducerIDResponse)
568
569 err := b.sendAndReceive(request, response)
570 if err != nil {
571 return nil, err
572 }
573
574 return response, nil
575}
576
577//AddPartitionsToTxn send a request to add partition to txn and returns
578//a response or error
579func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
580 response := new(AddPartitionsToTxnResponse)
581
582 err := b.sendAndReceive(request, response)
583 if err != nil {
584 return nil, err
585 }
586
587 return response, nil
588}
589
590//AddOffsetsToTxn sends a request to add offsets to txn and returns a response
591//or error
592func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
593 response := new(AddOffsetsToTxnResponse)
594
595 err := b.sendAndReceive(request, response)
596 if err != nil {
597 return nil, err
598 }
599
600 return response, nil
601}
602
603//EndTxn sends a request to end txn and returns a response or error
604func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
605 response := new(EndTxnResponse)
606
607 err := b.sendAndReceive(request, response)
608 if err != nil {
609 return nil, err
610 }
611
612 return response, nil
613}
614
615//TxnOffsetCommit sends a request to commit transaction offsets and returns
616//a response or error
617func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
618 response := new(TxnOffsetCommitResponse)
619
620 err := b.sendAndReceive(request, response)
621 if err != nil {
622 return nil, err
623 }
624
625 return response, nil
626}
627
628//DescribeConfigs sends a request to describe config and returns a response or
629//error
630func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
631 response := new(DescribeConfigsResponse)
632
633 err := b.sendAndReceive(request, response)
634 if err != nil {
635 return nil, err
636 }
637
638 return response, nil
639}
640
641//AlterConfigs sends a request to alter config and return a response or error
642func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
643 response := new(AlterConfigsResponse)
644
645 err := b.sendAndReceive(request, response)
646 if err != nil {
647 return nil, err
648 }
649
650 return response, nil
651}
652
653//DeleteGroups sends a request to delete groups and returns a response or error
654func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
655 response := new(DeleteGroupsResponse)
656
657 if err := b.sendAndReceive(request, response); err != nil {
658 return nil, err
659 }
660
661 return response, nil
662}
663
664//DescribeLogDirs sends a request to get the broker's log dir paths and sizes
665func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
666 response := new(DescribeLogDirsResponse)
667
668 err := b.sendAndReceive(request, response)
669 if err != nil {
670 return nil, err
671 }
672
673 return response, nil
674}
675
676// readFull ensures the conn ReadDeadline has been setup before making a
677// call to io.ReadFull
678func (b *Broker) readFull(buf []byte) (n int, err error) {
679 if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
680 return 0, err
681 }
682
683 return io.ReadFull(b.conn, buf)
684}
685
686// write ensures the conn WriteDeadline has been setup before making a
687// call to conn.Write
688func (b *Broker) write(buf []byte) (n int, err error) {
689 if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
690 return 0, err
691 }
692
693 return b.conn.Write(buf)
694}
695
696func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
697 b.lock.Lock()
698 defer b.lock.Unlock()
699
700 if b.conn == nil {
701 if b.connErr != nil {
702 return nil, b.connErr
703 }
704 return nil, ErrNotConnected
705 }
706
707 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
708 return nil, ErrUnsupportedVersion
709 }
710
711 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
712 buf, err := encode(req, b.conf.MetricRegistry)
713 if err != nil {
714 return nil, err
715 }
716
717 requestTime := time.Now()
718 // Will be decremented in responseReceiver (except error or request with NoResponse)
719 b.addRequestInFlightMetrics(1)
720 bytes, err := b.write(buf)
721 b.updateOutgoingCommunicationMetrics(bytes)
722 if err != nil {
723 b.addRequestInFlightMetrics(-1)
724 return nil, err
725 }
726 b.correlationID++
727
728 if !promiseResponse {
729 // Record request latency without the response
730 b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
731 return nil, nil
732 }
733
734 promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
735 b.responses <- promise
736
737 return &promise, nil
738}
739
740func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
741 promise, err := b.send(req, res != nil)
742 if err != nil {
743 return err
744 }
745
746 if promise == nil {
747 return nil
748 }
749
750 select {
751 case buf := <-promise.packets:
752 return versionedDecode(buf, res, req.version())
753 case err = <-promise.errors:
754 return err
755 }
756}
757
758func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
759 b.id, err = pd.getInt32()
760 if err != nil {
761 return err
762 }
763
764 host, err := pd.getString()
765 if err != nil {
766 return err
767 }
768
769 port, err := pd.getInt32()
770 if err != nil {
771 return err
772 }
773
774 if version >= 1 {
775 b.rack, err = pd.getNullableString()
776 if err != nil {
777 return err
778 }
779 }
780
781 b.addr = net.JoinHostPort(host, fmt.Sprint(port))
782 if _, _, err := net.SplitHostPort(b.addr); err != nil {
783 return err
784 }
785
786 return nil
787}
788
789func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
790 host, portstr, err := net.SplitHostPort(b.addr)
791 if err != nil {
792 return err
793 }
794
795 port, err := strconv.Atoi(portstr)
796 if err != nil {
797 return err
798 }
799
800 pe.putInt32(b.id)
801
802 err = pe.putString(host)
803 if err != nil {
804 return err
805 }
806
807 pe.putInt32(int32(port))
808
809 if version >= 1 {
810 err = pe.putNullableString(b.rack)
811 if err != nil {
812 return err
813 }
814 }
815
816 return nil
817}
818
819func (b *Broker) responseReceiver() {
820 var dead error
821 header := make([]byte, 8)
822
823 for response := range b.responses {
824 if dead != nil {
825 // This was previously incremented in send() and
826 // we are not calling updateIncomingCommunicationMetrics()
827 b.addRequestInFlightMetrics(-1)
828 response.errors <- dead
829 continue
830 }
831
832 bytesReadHeader, err := b.readFull(header)
833 requestLatency := time.Since(response.requestTime)
834 if err != nil {
835 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
836 dead = err
837 response.errors <- err
838 continue
839 }
840
841 decodedHeader := responseHeader{}
842 err = decode(header, &decodedHeader)
843 if err != nil {
844 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
845 dead = err
846 response.errors <- err
847 continue
848 }
849 if decodedHeader.correlationID != response.correlationID {
850 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
851 // TODO if decoded ID < cur ID, discard until we catch up
852 // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
853 dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
854 response.errors <- dead
855 continue
856 }
857
858 buf := make([]byte, decodedHeader.length-4)
859 bytesReadBody, err := b.readFull(buf)
860 b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
861 if err != nil {
862 dead = err
863 response.errors <- err
864 continue
865 }
866
867 response.packets <- buf
868 }
869 close(b.done)
870}
871
872func (b *Broker) authenticateViaSASL() error {
873 switch b.conf.Net.SASL.Mechanism {
874 case SASLTypeOAuth:
875 return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
876 case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
877 return b.sendAndReceiveSASLSCRAMv1()
878 case SASLTypeGSSAPI:
879 return b.sendAndReceiveKerberos()
880 default:
881 return b.sendAndReceiveSASLPlainAuth()
882 }
883}
884
885func (b *Broker) sendAndReceiveKerberos() error {
886 b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
887 if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
888 b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
889 }
890 return b.kerberosAuthenticator.Authorize(b)
891}
892
893func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
894 rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
895
896 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
897 buf, err := encode(req, b.conf.MetricRegistry)
898 if err != nil {
899 return err
900 }
901
902 requestTime := time.Now()
903 // Will be decremented in updateIncomingCommunicationMetrics (except error)
904 b.addRequestInFlightMetrics(1)
905 bytes, err := b.write(buf)
906 b.updateOutgoingCommunicationMetrics(bytes)
907 if err != nil {
908 b.addRequestInFlightMetrics(-1)
909 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
910 return err
911 }
912 b.correlationID++
913
914 header := make([]byte, 8) // response header
915 _, err = b.readFull(header)
916 if err != nil {
917 b.addRequestInFlightMetrics(-1)
918 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
919 return err
920 }
921
922 length := binary.BigEndian.Uint32(header[:4])
923 payload := make([]byte, length-4)
924 n, err := b.readFull(payload)
925 if err != nil {
926 b.addRequestInFlightMetrics(-1)
927 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
928 return err
929 }
930
931 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
932 res := &SaslHandshakeResponse{}
933
934 err = versionedDecode(payload, res, 0)
935 if err != nil {
936 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
937 return err
938 }
939
940 if res.Err != ErrNoError {
941 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
942 return res.Err
943 }
944
945 Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
946 return nil
947}
948
949// Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
950// Kafka 1.x.x onward added a SaslAuthenticate request/response message which
951// wraps the SASL flow in the Kafka protocol, which allows for returning
952// meaningful errors on authentication failure.
953//
954// In SASL Plain, Kafka expects the auth header to be in the following format
955// Message format (from https://tools.ietf.org/html/rfc4616):
956//
957// message = [authzid] UTF8NUL authcid UTF8NUL passwd
958// authcid = 1*SAFE ; MUST accept up to 255 octets
959// authzid = 1*SAFE ; MUST accept up to 255 octets
960// passwd = 1*SAFE ; MUST accept up to 255 octets
961// UTF8NUL = %x00 ; UTF-8 encoded NUL character
962//
963// SAFE = UTF1 / UTF2 / UTF3 / UTF4
964// ;; any UTF-8 encoded Unicode character except NUL
965//
966// With SASL v0 handshake and auth then:
967// When credentials are valid, Kafka returns a 4 byte array of null characters.
968// When credentials are invalid, Kafka closes the connection.
969//
970// With SASL v1 handshake and auth then:
971// When credentials are invalid, Kafka replies with a SaslAuthenticate response
972// containing an error code and message detailing the authentication failure.
973func (b *Broker) sendAndReceiveSASLPlainAuth() error {
974 // default to V0 to allow for backward compatability when SASL is enabled
975 // but not the handshake
976 if b.conf.Net.SASL.Handshake {
977 handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
978 if handshakeErr != nil {
979 Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
980 return handshakeErr
981 }
982 }
983
984 if b.conf.Net.SASL.Version == SASLHandshakeV1 {
985 return b.sendAndReceiveV1SASLPlainAuth()
986 }
987 return b.sendAndReceiveV0SASLPlainAuth()
988}
989
990// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
991func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
992 length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
993 authBytes := make([]byte, length+4) //4 byte length header + auth data
994 binary.BigEndian.PutUint32(authBytes, uint32(length))
995 copy(authBytes[4:], []byte(b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
996
997 requestTime := time.Now()
998 // Will be decremented in updateIncomingCommunicationMetrics (except error)
999 b.addRequestInFlightMetrics(1)
1000 bytesWritten, err := b.write(authBytes)
1001 b.updateOutgoingCommunicationMetrics(bytesWritten)
1002 if err != nil {
1003 b.addRequestInFlightMetrics(-1)
1004 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1005 return err
1006 }
1007
1008 header := make([]byte, 4)
1009 n, err := b.readFull(header)
1010 b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
1011 // If the credentials are valid, we would get a 4 byte response filled with null characters.
1012 // Otherwise, the broker closes the connection and we get an EOF
1013 if err != nil {
1014 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1015 return err
1016 }
1017
1018 Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
1019 return nil
1020}
1021
1022// sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
1023func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
1024 correlationID := b.correlationID
1025
1026 requestTime := time.Now()
1027
1028 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1029 b.addRequestInFlightMetrics(1)
1030 bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
1031 b.updateOutgoingCommunicationMetrics(bytesWritten)
1032
1033 if err != nil {
1034 b.addRequestInFlightMetrics(-1)
1035 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1036 return err
1037 }
1038
1039 b.correlationID++
1040
1041 bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
1042 b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
1043
1044 // With v1 sasl we get an error message set in the response we can return
1045 if err != nil {
1046 Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
1047 return err
1048 }
1049
1050 return nil
1051}
1052
1053// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
1054// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
1055func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
1056 if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
1057 return err
1058 }
1059
1060 token, err := provider.Token()
1061 if err != nil {
1062 return err
1063 }
1064
1065 message, err := buildClientFirstMessage(token)
1066 if err != nil {
1067 return err
1068 }
1069
1070 challenged, err := b.sendClientMessage(message)
1071 if err != nil {
1072 return err
1073 }
1074
1075 if challenged {
1076 // Abort the token exchange. The broker returns the failure code.
1077 _, err = b.sendClientMessage([]byte(`\x01`))
1078 }
1079
1080 return err
1081}
1082
1083// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
1084// if the broker responds with a challenge, in which case the token is
1085// rejected.
1086func (b *Broker) sendClientMessage(message []byte) (bool, error) {
1087 requestTime := time.Now()
1088 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1089 b.addRequestInFlightMetrics(1)
1090 correlationID := b.correlationID
1091
1092 bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
1093 b.updateOutgoingCommunicationMetrics(bytesWritten)
1094 if err != nil {
1095 b.addRequestInFlightMetrics(-1)
1096 return false, err
1097 }
1098
1099 b.correlationID++
1100
1101 res := &SaslAuthenticateResponse{}
1102 bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
1103
1104 requestLatency := time.Since(requestTime)
1105 b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
1106
1107 isChallenge := len(res.SaslAuthBytes) > 0
1108
1109 if isChallenge && err != nil {
1110 Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
1111 }
1112
1113 return isChallenge, err
1114}
1115
1116func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
1117 if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
1118 return err
1119 }
1120
1121 scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
1122 if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
1123 return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
1124 }
1125
1126 msg, err := scramClient.Step("")
1127 if err != nil {
1128 return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
1129 }
1130
1131 for !scramClient.Done() {
1132 requestTime := time.Now()
1133 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1134 b.addRequestInFlightMetrics(1)
1135 correlationID := b.correlationID
1136 bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
1137 b.updateOutgoingCommunicationMetrics(bytesWritten)
1138 if err != nil {
1139 b.addRequestInFlightMetrics(-1)
1140 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1141 return err
1142 }
1143
1144 b.correlationID++
1145 challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
1146 if err != nil {
1147 b.addRequestInFlightMetrics(-1)
1148 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1149 return err
1150 }
1151
1152 b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
1153 msg, err = scramClient.Step(string(challenge))
1154 if err != nil {
1155 Logger.Println("SASL authentication failed", err)
1156 return err
1157 }
1158 }
1159
1160 Logger.Println("SASL authentication succeeded")
1161 return nil
1162}
1163
1164func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
1165 rb := &SaslAuthenticateRequest{msg}
1166 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1167 buf, err := encode(req, b.conf.MetricRegistry)
1168 if err != nil {
1169 return 0, err
1170 }
1171
1172 return b.write(buf)
1173}
1174
1175func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
1176 buf := make([]byte, responseLengthSize+correlationIDSize)
1177 _, err := b.readFull(buf)
1178 if err != nil {
1179 return nil, err
1180 }
1181
1182 header := responseHeader{}
1183 err = decode(buf, &header)
1184 if err != nil {
1185 return nil, err
1186 }
1187
1188 if header.correlationID != correlationID {
1189 return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1190 }
1191
1192 buf = make([]byte, header.length-correlationIDSize)
1193 _, err = b.readFull(buf)
1194 if err != nil {
1195 return nil, err
1196 }
1197
1198 res := &SaslAuthenticateResponse{}
1199 if err := versionedDecode(buf, res, 0); err != nil {
1200 return nil, err
1201 }
1202 if res.Err != ErrNoError {
1203 return nil, res.Err
1204 }
1205 return res.SaslAuthBytes, nil
1206}
1207
1208// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
1209// https://tools.ietf.org/html/rfc7628
1210func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
1211 var ext string
1212
1213 if token.Extensions != nil && len(token.Extensions) > 0 {
1214 if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
1215 return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
1216 }
1217 ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
1218 }
1219
1220 resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
1221
1222 return resp, nil
1223}
1224
1225// mapToString returns a list of key-value pairs ordered by key.
1226// keyValSep separates the key from the value. elemSep separates each pair.
1227func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
1228 buf := make([]string, 0, len(extensions))
1229
1230 for k, v := range extensions {
1231 buf = append(buf, k+keyValSep+v)
1232 }
1233
1234 sort.Strings(buf)
1235
1236 return strings.Join(buf, elemSep)
1237}
1238
1239func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
1240 authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
1241 rb := &SaslAuthenticateRequest{authBytes}
1242 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1243 buf, err := encode(req, b.conf.MetricRegistry)
1244 if err != nil {
1245 return 0, err
1246 }
1247
1248 return b.write(buf)
1249}
1250
1251func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
1252 rb := &SaslAuthenticateRequest{initialResp}
1253
1254 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1255
1256 buf, err := encode(req, b.conf.MetricRegistry)
1257 if err != nil {
1258 return 0, err
1259 }
1260
1261 return b.write(buf)
1262}
1263
1264func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
1265 buf := make([]byte, responseLengthSize+correlationIDSize)
1266 bytesRead, err := b.readFull(buf)
1267 if err != nil {
1268 return bytesRead, err
1269 }
1270
1271 header := responseHeader{}
1272 err = decode(buf, &header)
1273 if err != nil {
1274 return bytesRead, err
1275 }
1276
1277 if header.correlationID != correlationID {
1278 return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1279 }
1280
1281 buf = make([]byte, header.length-correlationIDSize)
1282 c, err := b.readFull(buf)
1283 bytesRead += c
1284 if err != nil {
1285 return bytesRead, err
1286 }
1287
1288 if err := versionedDecode(buf, res, 0); err != nil {
1289 return bytesRead, err
1290 }
1291
1292 if res.Err != ErrNoError {
1293 return bytesRead, res.Err
1294 }
1295
1296 return bytesRead, nil
1297}
1298
1299func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
1300 b.updateRequestLatencyAndInFlightMetrics(requestLatency)
1301 b.responseRate.Mark(1)
1302
1303 if b.brokerResponseRate != nil {
1304 b.brokerResponseRate.Mark(1)
1305 }
1306
1307 responseSize := int64(bytes)
1308 b.incomingByteRate.Mark(responseSize)
1309 if b.brokerIncomingByteRate != nil {
1310 b.brokerIncomingByteRate.Mark(responseSize)
1311 }
1312
1313 b.responseSize.Update(responseSize)
1314 if b.brokerResponseSize != nil {
1315 b.brokerResponseSize.Update(responseSize)
1316 }
1317}
1318
1319func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
1320 requestLatencyInMs := int64(requestLatency / time.Millisecond)
1321 b.requestLatency.Update(requestLatencyInMs)
1322
1323 if b.brokerRequestLatency != nil {
1324 b.brokerRequestLatency.Update(requestLatencyInMs)
1325 }
1326
1327 b.addRequestInFlightMetrics(-1)
1328}
1329
1330func (b *Broker) addRequestInFlightMetrics(i int64) {
1331 b.requestsInFlight.Inc(i)
1332 if b.brokerRequestsInFlight != nil {
1333 b.brokerRequestsInFlight.Inc(i)
1334 }
1335}
1336
1337func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
1338 b.requestRate.Mark(1)
1339 if b.brokerRequestRate != nil {
1340 b.brokerRequestRate.Mark(1)
1341 }
1342
1343 requestSize := int64(bytes)
1344 b.outgoingByteRate.Mark(requestSize)
1345 if b.brokerOutgoingByteRate != nil {
1346 b.brokerOutgoingByteRate.Mark(requestSize)
1347 }
1348
1349 b.requestSize.Update(requestSize)
1350 if b.brokerRequestSize != nil {
1351 b.brokerRequestSize.Update(requestSize)
1352 }
1353}
1354
1355func (b *Broker) registerMetrics() {
1356 b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
1357 b.brokerRequestRate = b.registerMeter("request-rate")
1358 b.brokerRequestSize = b.registerHistogram("request-size")
1359 b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
1360 b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
1361 b.brokerResponseRate = b.registerMeter("response-rate")
1362 b.brokerResponseSize = b.registerHistogram("response-size")
1363 b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
1364}
1365
1366func (b *Broker) unregisterMetrics() {
1367 for _, name := range b.registeredMetrics {
1368 b.conf.MetricRegistry.Unregister(name)
1369 }
1370}
1371
1372func (b *Broker) registerMeter(name string) metrics.Meter {
1373 nameForBroker := getMetricNameForBroker(name, b)
1374 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1375 return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
1376}
1377
1378func (b *Broker) registerHistogram(name string) metrics.Histogram {
1379 nameForBroker := getMetricNameForBroker(name, b)
1380 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1381 return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
1382}
1383
1384func (b *Broker) registerCounter(name string) metrics.Counter {
1385 nameForBroker := getMetricNameForBroker(name, b)
1386 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1387 return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
1388}