blob: 6a33b802ea919936c6ff458bdf4dbebaec6b109f [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "crypto/tls"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "strconv"
10 "sync"
11 "sync/atomic"
12 "time"
13
14 "github.com/rcrowley/go-metrics"
15)
16
17// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
18type Broker struct {
19 id int32
20 addr string
21 rack *string
22
23 conf *Config
24 correlationID int32
25 conn net.Conn
26 connErr error
27 lock sync.Mutex
28 opened int32
29
30 responses chan responsePromise
31 done chan bool
32
33 incomingByteRate metrics.Meter
34 requestRate metrics.Meter
35 requestSize metrics.Histogram
36 requestLatency metrics.Histogram
37 outgoingByteRate metrics.Meter
38 responseRate metrics.Meter
39 responseSize metrics.Histogram
40 brokerIncomingByteRate metrics.Meter
41 brokerRequestRate metrics.Meter
42 brokerRequestSize metrics.Histogram
43 brokerRequestLatency metrics.Histogram
44 brokerOutgoingByteRate metrics.Meter
45 brokerResponseRate metrics.Meter
46 brokerResponseSize metrics.Histogram
47}
48
49type responsePromise struct {
50 requestTime time.Time
51 correlationID int32
52 packets chan []byte
53 errors chan error
54}
55
56// NewBroker creates and returns a Broker targeting the given host:port address.
57// This does not attempt to actually connect, you have to call Open() for that.
58func NewBroker(addr string) *Broker {
59 return &Broker{id: -1, addr: addr}
60}
61
62// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
63// waiting for the connection to complete. This means that any subsequent operations on the broker will
64// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
65// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
66// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
67func (b *Broker) Open(conf *Config) error {
68 if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
69 return ErrAlreadyConnected
70 }
71
72 if conf == nil {
73 conf = NewConfig()
74 }
75
76 err := conf.Validate()
77 if err != nil {
78 return err
79 }
80
81 b.lock.Lock()
82
83 go withRecover(func() {
84 defer b.lock.Unlock()
85
86 dialer := net.Dialer{
87 Timeout: conf.Net.DialTimeout,
88 KeepAlive: conf.Net.KeepAlive,
89 LocalAddr: conf.Net.LocalAddr,
90 }
91
92 if conf.Net.TLS.Enable {
93 b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
94 } else {
95 b.conn, b.connErr = dialer.Dial("tcp", b.addr)
96 }
97 if b.connErr != nil {
98 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
99 b.conn = nil
100 atomic.StoreInt32(&b.opened, 0)
101 return
102 }
103 b.conn = newBufConn(b.conn)
104
105 b.conf = conf
106
107 // Create or reuse the global metrics shared between brokers
108 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
109 b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
110 b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
111 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
112 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
113 b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
114 b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
115 // Do not gather metrics for seeded broker (only used during bootstrap) because they share
116 // the same id (-1) and are already exposed through the global metrics above
117 if b.id >= 0 {
118 b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
119 b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
120 b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
121 b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
122 b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
123 b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
124 b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
125 }
126
127 if conf.Net.SASL.Enable {
128 b.connErr = b.sendAndReceiveSASLPlainAuth()
129 if b.connErr != nil {
130 err = b.conn.Close()
131 if err == nil {
132 Logger.Printf("Closed connection to broker %s\n", b.addr)
133 } else {
134 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
135 }
136 b.conn = nil
137 atomic.StoreInt32(&b.opened, 0)
138 return
139 }
140 }
141
142 b.done = make(chan bool)
143 b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
144
145 if b.id >= 0 {
146 Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
147 } else {
148 Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
149 }
150 go withRecover(b.responseReceiver)
151 })
152
153 return nil
154}
155
156// Connected returns true if the broker is connected and false otherwise. If the broker is not
157// connected but it had tried to connect, the error from that connection attempt is also returned.
158func (b *Broker) Connected() (bool, error) {
159 b.lock.Lock()
160 defer b.lock.Unlock()
161
162 return b.conn != nil, b.connErr
163}
164
165func (b *Broker) Close() error {
166 b.lock.Lock()
167 defer b.lock.Unlock()
168
169 if b.conn == nil {
170 return ErrNotConnected
171 }
172
173 close(b.responses)
174 <-b.done
175
176 err := b.conn.Close()
177
178 b.conn = nil
179 b.connErr = nil
180 b.done = nil
181 b.responses = nil
182
183 if b.id >= 0 {
184 b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
185 b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
186 b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
187 b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
188 }
189
190 if err == nil {
191 Logger.Printf("Closed connection to broker %s\n", b.addr)
192 } else {
193 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
194 }
195
196 atomic.StoreInt32(&b.opened, 0)
197
198 return err
199}
200
201// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
202func (b *Broker) ID() int32 {
203 return b.id
204}
205
206// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
207func (b *Broker) Addr() string {
208 return b.addr
209}
210
211// Rack returns the broker's rack as retrieved from Kafka's metadata or the
212// empty string if it is not known. The returned value corresponds to the
213// broker's broker.rack configuration setting. Requires protocol version to be
214// at least v0.10.0.0.
215func (b *Broker) Rack() string {
216 if b.rack == nil {
217 return ""
218 }
219 return *b.rack
220}
221
222func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
223 response := new(MetadataResponse)
224
225 err := b.sendAndReceive(request, response)
226
227 if err != nil {
228 return nil, err
229 }
230
231 return response, nil
232}
233
234func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
235 response := new(ConsumerMetadataResponse)
236
237 err := b.sendAndReceive(request, response)
238
239 if err != nil {
240 return nil, err
241 }
242
243 return response, nil
244}
245
246func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
247 response := new(FindCoordinatorResponse)
248
249 err := b.sendAndReceive(request, response)
250
251 if err != nil {
252 return nil, err
253 }
254
255 return response, nil
256}
257
258func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
259 response := new(OffsetResponse)
260
261 err := b.sendAndReceive(request, response)
262
263 if err != nil {
264 return nil, err
265 }
266
267 return response, nil
268}
269
270func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
271 var response *ProduceResponse
272 var err error
273
274 if request.RequiredAcks == NoResponse {
275 err = b.sendAndReceive(request, nil)
276 } else {
277 response = new(ProduceResponse)
278 err = b.sendAndReceive(request, response)
279 }
280
281 if err != nil {
282 return nil, err
283 }
284
285 return response, nil
286}
287
288func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
289 response := new(FetchResponse)
290
291 err := b.sendAndReceive(request, response)
292
293 if err != nil {
294 return nil, err
295 }
296
297 return response, nil
298}
299
300func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
301 response := new(OffsetCommitResponse)
302
303 err := b.sendAndReceive(request, response)
304
305 if err != nil {
306 return nil, err
307 }
308
309 return response, nil
310}
311
312func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
313 response := new(OffsetFetchResponse)
314
315 err := b.sendAndReceive(request, response)
316
317 if err != nil {
318 return nil, err
319 }
320
321 return response, nil
322}
323
324func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
325 response := new(JoinGroupResponse)
326
327 err := b.sendAndReceive(request, response)
328 if err != nil {
329 return nil, err
330 }
331
332 return response, nil
333}
334
335func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
336 response := new(SyncGroupResponse)
337
338 err := b.sendAndReceive(request, response)
339 if err != nil {
340 return nil, err
341 }
342
343 return response, nil
344}
345
346func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
347 response := new(LeaveGroupResponse)
348
349 err := b.sendAndReceive(request, response)
350 if err != nil {
351 return nil, err
352 }
353
354 return response, nil
355}
356
357func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
358 response := new(HeartbeatResponse)
359
360 err := b.sendAndReceive(request, response)
361 if err != nil {
362 return nil, err
363 }
364
365 return response, nil
366}
367
368func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
369 response := new(ListGroupsResponse)
370
371 err := b.sendAndReceive(request, response)
372 if err != nil {
373 return nil, err
374 }
375
376 return response, nil
377}
378
379func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
380 response := new(DescribeGroupsResponse)
381
382 err := b.sendAndReceive(request, response)
383 if err != nil {
384 return nil, err
385 }
386
387 return response, nil
388}
389
390func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
391 response := new(ApiVersionsResponse)
392
393 err := b.sendAndReceive(request, response)
394 if err != nil {
395 return nil, err
396 }
397
398 return response, nil
399}
400
401func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
402 response := new(CreateTopicsResponse)
403
404 err := b.sendAndReceive(request, response)
405 if err != nil {
406 return nil, err
407 }
408
409 return response, nil
410}
411
412func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
413 response := new(DeleteTopicsResponse)
414
415 err := b.sendAndReceive(request, response)
416 if err != nil {
417 return nil, err
418 }
419
420 return response, nil
421}
422
423func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
424 response := new(CreatePartitionsResponse)
425
426 err := b.sendAndReceive(request, response)
427 if err != nil {
428 return nil, err
429 }
430
431 return response, nil
432}
433
434func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
435 response := new(DeleteRecordsResponse)
436
437 err := b.sendAndReceive(request, response)
438 if err != nil {
439 return nil, err
440 }
441
442 return response, nil
443}
444
445func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
446 response := new(DescribeAclsResponse)
447
448 err := b.sendAndReceive(request, response)
449 if err != nil {
450 return nil, err
451 }
452
453 return response, nil
454}
455
456func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
457 response := new(CreateAclsResponse)
458
459 err := b.sendAndReceive(request, response)
460 if err != nil {
461 return nil, err
462 }
463
464 return response, nil
465}
466
467func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
468 response := new(DeleteAclsResponse)
469
470 err := b.sendAndReceive(request, response)
471 if err != nil {
472 return nil, err
473 }
474
475 return response, nil
476}
477
478func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
479 response := new(InitProducerIDResponse)
480
481 err := b.sendAndReceive(request, response)
482 if err != nil {
483 return nil, err
484 }
485
486 return response, nil
487}
488
489func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
490 response := new(AddPartitionsToTxnResponse)
491
492 err := b.sendAndReceive(request, response)
493 if err != nil {
494 return nil, err
495 }
496
497 return response, nil
498}
499
500func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
501 response := new(AddOffsetsToTxnResponse)
502
503 err := b.sendAndReceive(request, response)
504 if err != nil {
505 return nil, err
506 }
507
508 return response, nil
509}
510
511func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
512 response := new(EndTxnResponse)
513
514 err := b.sendAndReceive(request, response)
515 if err != nil {
516 return nil, err
517 }
518
519 return response, nil
520}
521
522func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
523 response := new(TxnOffsetCommitResponse)
524
525 err := b.sendAndReceive(request, response)
526 if err != nil {
527 return nil, err
528 }
529
530 return response, nil
531}
532
533func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
534 response := new(DescribeConfigsResponse)
535
536 err := b.sendAndReceive(request, response)
537 if err != nil {
538 return nil, err
539 }
540
541 return response, nil
542}
543
544func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
545 response := new(AlterConfigsResponse)
546
547 err := b.sendAndReceive(request, response)
548 if err != nil {
549 return nil, err
550 }
551
552 return response, nil
553}
554
555func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
556 response := new(DeleteGroupsResponse)
557
558 if err := b.sendAndReceive(request, response); err != nil {
559 return nil, err
560 }
561
562 return response, nil
563}
564
565func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
566 b.lock.Lock()
567 defer b.lock.Unlock()
568
569 if b.conn == nil {
570 if b.connErr != nil {
571 return nil, b.connErr
572 }
573 return nil, ErrNotConnected
574 }
575
576 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
577 return nil, ErrUnsupportedVersion
578 }
579
580 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
581 buf, err := encode(req, b.conf.MetricRegistry)
582 if err != nil {
583 return nil, err
584 }
585
586 err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
587 if err != nil {
588 return nil, err
589 }
590
591 requestTime := time.Now()
592 bytes, err := b.conn.Write(buf)
593 b.updateOutgoingCommunicationMetrics(bytes)
594 if err != nil {
595 return nil, err
596 }
597 b.correlationID++
598
599 if !promiseResponse {
600 // Record request latency without the response
601 b.updateRequestLatencyMetrics(time.Since(requestTime))
602 return nil, nil
603 }
604
605 promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
606 b.responses <- promise
607
608 return &promise, nil
609}
610
611func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
612 promise, err := b.send(req, res != nil)
613
614 if err != nil {
615 return err
616 }
617
618 if promise == nil {
619 return nil
620 }
621
622 select {
623 case buf := <-promise.packets:
624 return versionedDecode(buf, res, req.version())
625 case err = <-promise.errors:
626 return err
627 }
628}
629
630func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
631 b.id, err = pd.getInt32()
632 if err != nil {
633 return err
634 }
635
636 host, err := pd.getString()
637 if err != nil {
638 return err
639 }
640
641 port, err := pd.getInt32()
642 if err != nil {
643 return err
644 }
645
646 if version >= 1 {
647 b.rack, err = pd.getNullableString()
648 if err != nil {
649 return err
650 }
651 }
652
653 b.addr = net.JoinHostPort(host, fmt.Sprint(port))
654 if _, _, err := net.SplitHostPort(b.addr); err != nil {
655 return err
656 }
657
658 return nil
659}
660
661func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
662
663 host, portstr, err := net.SplitHostPort(b.addr)
664 if err != nil {
665 return err
666 }
667 port, err := strconv.Atoi(portstr)
668 if err != nil {
669 return err
670 }
671
672 pe.putInt32(b.id)
673
674 err = pe.putString(host)
675 if err != nil {
676 return err
677 }
678
679 pe.putInt32(int32(port))
680
681 if version >= 1 {
682 err = pe.putNullableString(b.rack)
683 if err != nil {
684 return err
685 }
686 }
687
688 return nil
689}
690
691func (b *Broker) responseReceiver() {
692 var dead error
693 header := make([]byte, 8)
694 for response := range b.responses {
695 if dead != nil {
696 response.errors <- dead
697 continue
698 }
699
700 err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
701 if err != nil {
702 dead = err
703 response.errors <- err
704 continue
705 }
706
707 bytesReadHeader, err := io.ReadFull(b.conn, header)
708 requestLatency := time.Since(response.requestTime)
709 if err != nil {
710 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
711 dead = err
712 response.errors <- err
713 continue
714 }
715
716 decodedHeader := responseHeader{}
717 err = decode(header, &decodedHeader)
718 if err != nil {
719 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
720 dead = err
721 response.errors <- err
722 continue
723 }
724 if decodedHeader.correlationID != response.correlationID {
725 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
726 // TODO if decoded ID < cur ID, discard until we catch up
727 // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
728 dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
729 response.errors <- dead
730 continue
731 }
732
733 buf := make([]byte, decodedHeader.length-4)
734 bytesReadBody, err := io.ReadFull(b.conn, buf)
735 b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
736 if err != nil {
737 dead = err
738 response.errors <- err
739 continue
740 }
741
742 response.packets <- buf
743 }
744 close(b.done)
745}
746
747func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
748 rb := &SaslHandshakeRequest{"PLAIN"}
749 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
750 buf, err := encode(req, b.conf.MetricRegistry)
751 if err != nil {
752 return err
753 }
754
755 err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
756 if err != nil {
757 return err
758 }
759
760 requestTime := time.Now()
761 bytes, err := b.conn.Write(buf)
762 b.updateOutgoingCommunicationMetrics(bytes)
763 if err != nil {
764 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
765 return err
766 }
767 b.correlationID++
768 //wait for the response
769 header := make([]byte, 8) // response header
770 _, err = io.ReadFull(b.conn, header)
771 if err != nil {
772 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
773 return err
774 }
775 length := binary.BigEndian.Uint32(header[:4])
776 payload := make([]byte, length-4)
777 n, err := io.ReadFull(b.conn, payload)
778 if err != nil {
779 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
780 return err
781 }
782 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
783 res := &SaslHandshakeResponse{}
784 err = versionedDecode(payload, res, 0)
785 if err != nil {
786 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
787 return err
788 }
789 if res.Err != ErrNoError {
790 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
791 return res.Err
792 }
793 Logger.Print("Successful SASL handshake")
794 return nil
795}
796
797// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
798// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
799//
800// In SASL Plain, Kafka expects the auth header to be in the following format
801// Message format (from https://tools.ietf.org/html/rfc4616):
802//
803// message = [authzid] UTF8NUL authcid UTF8NUL passwd
804// authcid = 1*SAFE ; MUST accept up to 255 octets
805// authzid = 1*SAFE ; MUST accept up to 255 octets
806// passwd = 1*SAFE ; MUST accept up to 255 octets
807// UTF8NUL = %x00 ; UTF-8 encoded NUL character
808//
809// SAFE = UTF1 / UTF2 / UTF3 / UTF4
810// ;; any UTF-8 encoded Unicode character except NUL
811//
812// When credentials are valid, Kafka returns a 4 byte array of null characters.
813// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
814// of responding to bad credentials but thats how its being done today.
815func (b *Broker) sendAndReceiveSASLPlainAuth() error {
816 if b.conf.Net.SASL.Handshake {
817 handshakeErr := b.sendAndReceiveSASLPlainHandshake()
818 if handshakeErr != nil {
819 Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
820 return handshakeErr
821 }
822 }
823 length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
824 authBytes := make([]byte, length+4) //4 byte length header + auth data
825 binary.BigEndian.PutUint32(authBytes, uint32(length))
826 copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
827
828 err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
829 if err != nil {
830 Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
831 return err
832 }
833
834 requestTime := time.Now()
835 bytesWritten, err := b.conn.Write(authBytes)
836 b.updateOutgoingCommunicationMetrics(bytesWritten)
837 if err != nil {
838 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
839 return err
840 }
841
842 header := make([]byte, 4)
843 n, err := io.ReadFull(b.conn, header)
844 b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
845 // If the credentials are valid, we would get a 4 byte response filled with null characters.
846 // Otherwise, the broker closes the connection and we get an EOF
847 if err != nil {
848 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
849 return err
850 }
851
852 Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
853 return nil
854}
855
856func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
857 b.updateRequestLatencyMetrics(requestLatency)
858 b.responseRate.Mark(1)
859 if b.brokerResponseRate != nil {
860 b.brokerResponseRate.Mark(1)
861 }
862 responseSize := int64(bytes)
863 b.incomingByteRate.Mark(responseSize)
864 if b.brokerIncomingByteRate != nil {
865 b.brokerIncomingByteRate.Mark(responseSize)
866 }
867 b.responseSize.Update(responseSize)
868 if b.brokerResponseSize != nil {
869 b.brokerResponseSize.Update(responseSize)
870 }
871}
872
873func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
874 requestLatencyInMs := int64(requestLatency / time.Millisecond)
875 b.requestLatency.Update(requestLatencyInMs)
876 if b.brokerRequestLatency != nil {
877 b.brokerRequestLatency.Update(requestLatencyInMs)
878 }
879}
880
881func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
882 b.requestRate.Mark(1)
883 if b.brokerRequestRate != nil {
884 b.brokerRequestRate.Mark(1)
885 }
886 requestSize := int64(bytes)
887 b.outgoingByteRate.Mark(requestSize)
888 if b.brokerOutgoingByteRate != nil {
889 b.brokerOutgoingByteRate.Mark(requestSize)
890 }
891 b.requestSize.Update(requestSize)
892 if b.brokerRequestSize != nil {
893 b.brokerRequestSize.Update(requestSize)
894 }
895}