blob: c60e9a044a0e14832396dac60423a8392774f850 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import (
4 "crypto/tls"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "sort"
10 "strconv"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15
kesavandc71914f2022-03-25 11:19:03 +053016 "github.com/rcrowley/go-metrics"
kesavand2cde6582020-06-22 04:56:23 -040017)
18
19// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
20type Broker struct {
21 conf *Config
22 rack *string
23
24 id int32
25 addr string
26 correlationID int32
27 conn net.Conn
28 connErr error
29 lock sync.Mutex
30 opened int32
kesavandc71914f2022-03-25 11:19:03 +053031 responses chan *responsePromise
kesavand2cde6582020-06-22 04:56:23 -040032 done chan bool
33
34 registeredMetrics []string
35
36 incomingByteRate metrics.Meter
37 requestRate metrics.Meter
38 requestSize metrics.Histogram
39 requestLatency metrics.Histogram
40 outgoingByteRate metrics.Meter
41 responseRate metrics.Meter
42 responseSize metrics.Histogram
kesavandc71914f2022-03-25 11:19:03 +053043 requestsInFlight metrics.Counter
kesavand2cde6582020-06-22 04:56:23 -040044 brokerIncomingByteRate metrics.Meter
45 brokerRequestRate metrics.Meter
46 brokerRequestSize metrics.Histogram
47 brokerRequestLatency metrics.Histogram
48 brokerOutgoingByteRate metrics.Meter
49 brokerResponseRate metrics.Meter
50 brokerResponseSize metrics.Histogram
kesavandc71914f2022-03-25 11:19:03 +053051 brokerRequestsInFlight metrics.Counter
52 brokerThrottleTime metrics.Histogram
kesavand2cde6582020-06-22 04:56:23 -040053
54 kerberosAuthenticator GSSAPIKerberosAuth
55}
56
57// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
58type SASLMechanism string
59
60const (
61 // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
62 SASLTypeOAuth = "OAUTHBEARER"
63 // SASLTypePlaintext represents the SASL/PLAIN mechanism
64 SASLTypePlaintext = "PLAIN"
65 // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
66 SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
67 // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
68 SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
69 SASLTypeGSSAPI = "GSSAPI"
70 // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
71 // server negotiate SASL auth using opaque packets.
72 SASLHandshakeV0 = int16(0)
73 // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
74 // server negotiate SASL by wrapping tokens with Kafka protocol headers.
75 SASLHandshakeV1 = int16(1)
76 // SASLExtKeyAuth is the reserved extension key name sent as part of the
kesavandc71914f2022-03-25 11:19:03 +053077 // SASL/OAUTHBEARER initial client response
kesavand2cde6582020-06-22 04:56:23 -040078 SASLExtKeyAuth = "auth"
79)
80
81// AccessToken contains an access token used to authenticate a
82// SASL/OAUTHBEARER client along with associated metadata.
83type AccessToken struct {
84 // Token is the access token payload.
85 Token string
86 // Extensions is a optional map of arbitrary key-value pairs that can be
87 // sent with the SASL/OAUTHBEARER initial client response. These values are
88 // ignored by the SASL server if they are unexpected. This feature is only
89 // supported by Kafka >= 2.1.0.
90 Extensions map[string]string
91}
92
93// AccessTokenProvider is the interface that encapsulates how implementors
94// can generate access tokens for Kafka broker authentication.
95type AccessTokenProvider interface {
96 // Token returns an access token. The implementation should ensure token
97 // reuse so that multiple calls at connect time do not create multiple
98 // tokens. The implementation should also periodically refresh the token in
99 // order to guarantee that each call returns an unexpired token. This
100 // method should not block indefinitely--a timeout error should be returned
101 // after a short period of inactivity so that the broker connection logic
102 // can log debugging information and retry.
103 Token() (*AccessToken, error)
104}
105
106// SCRAMClient is a an interface to a SCRAM
107// client implementation.
108type SCRAMClient interface {
109 // Begin prepares the client for the SCRAM exchange
110 // with the server with a user name and a password
111 Begin(userName, password, authzID string) error
112 // Step steps client through the SCRAM exchange. It is
113 // called repeatedly until it errors or `Done` returns true.
114 Step(challenge string) (response string, err error)
115 // Done should return true when the SCRAM conversation
116 // is over.
117 Done() bool
118}
119
120type responsePromise struct {
121 requestTime time.Time
122 correlationID int32
kesavandc71914f2022-03-25 11:19:03 +0530123 headerVersion int16
124 handler func([]byte, error)
kesavand2cde6582020-06-22 04:56:23 -0400125 packets chan []byte
126 errors chan error
127}
128
kesavandc71914f2022-03-25 11:19:03 +0530129func (p *responsePromise) handle(packets []byte, err error) {
130 // Use callback when provided
131 if p.handler != nil {
132 p.handler(packets, err)
133 return
134 }
135 // Otherwise fallback to using channels
136 if err != nil {
137 p.errors <- err
138 return
139 }
140 p.packets <- packets
141}
142
kesavand2cde6582020-06-22 04:56:23 -0400143// NewBroker creates and returns a Broker targeting the given host:port address.
144// This does not attempt to actually connect, you have to call Open() for that.
145func NewBroker(addr string) *Broker {
146 return &Broker{id: -1, addr: addr}
147}
148
149// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
150// waiting for the connection to complete. This means that any subsequent operations on the broker will
151// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
152// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
153// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
154func (b *Broker) Open(conf *Config) error {
155 if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
156 return ErrAlreadyConnected
157 }
158
159 if conf == nil {
160 conf = NewConfig()
161 }
162
163 err := conf.Validate()
164 if err != nil {
165 return err
166 }
167
kesavandc71914f2022-03-25 11:19:03 +0530168 usingApiVersionsRequests := conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest
169
kesavand2cde6582020-06-22 04:56:23 -0400170 b.lock.Lock()
171
172 go withRecover(func() {
kesavandc71914f2022-03-25 11:19:03 +0530173 defer func() {
174 b.lock.Unlock()
kesavand2cde6582020-06-22 04:56:23 -0400175
kesavandc71914f2022-03-25 11:19:03 +0530176 // Send an ApiVersionsRequest to identify the client (KIP-511).
177 // Ideally Sarama would use the response to control protocol versions,
178 // but for now just fire-and-forget just to send
179 if usingApiVersionsRequests {
180 _, err = b.ApiVersions(&ApiVersionsRequest{
181 Version: 3,
182 ClientSoftwareName: defaultClientSoftwareName,
183 ClientSoftwareVersion: version(),
184 })
185 if err != nil {
186 Logger.Printf("Error while sending ApiVersionsRequest to broker %s: %s\n", b.addr, err)
187 }
188 }
189 }()
190 dialer := conf.getDialer()
191 b.conn, b.connErr = dialer.Dial("tcp", b.addr)
kesavand2cde6582020-06-22 04:56:23 -0400192 if b.connErr != nil {
193 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
194 b.conn = nil
195 atomic.StoreInt32(&b.opened, 0)
196 return
197 }
kesavandc71914f2022-03-25 11:19:03 +0530198 if conf.Net.TLS.Enable {
199 b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
200 }
kesavand2cde6582020-06-22 04:56:23 -0400201
kesavandc71914f2022-03-25 11:19:03 +0530202 b.conn = newBufConn(b.conn)
kesavand2cde6582020-06-22 04:56:23 -0400203 b.conf = conf
204
205 // Create or reuse the global metrics shared between brokers
206 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
207 b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
208 b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
209 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
210 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
211 b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
212 b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
kesavandc71914f2022-03-25 11:19:03 +0530213 b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
kesavand2cde6582020-06-22 04:56:23 -0400214 // Do not gather metrics for seeded broker (only used during bootstrap) because they share
215 // the same id (-1) and are already exposed through the global metrics above
kesavandc71914f2022-03-25 11:19:03 +0530216 if b.id >= 0 && !metrics.UseNilMetrics {
kesavand2cde6582020-06-22 04:56:23 -0400217 b.registerMetrics()
218 }
219
220 if conf.Net.SASL.Enable {
kesavand2cde6582020-06-22 04:56:23 -0400221 b.connErr = b.authenticateViaSASL()
222
223 if b.connErr != nil {
224 err = b.conn.Close()
225 if err == nil {
kesavandc71914f2022-03-25 11:19:03 +0530226 DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
kesavand2cde6582020-06-22 04:56:23 -0400227 } else {
228 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
229 }
230 b.conn = nil
231 atomic.StoreInt32(&b.opened, 0)
232 return
233 }
234 }
235
236 b.done = make(chan bool)
kesavandc71914f2022-03-25 11:19:03 +0530237 b.responses = make(chan *responsePromise, b.conf.Net.MaxOpenRequests-1)
kesavand2cde6582020-06-22 04:56:23 -0400238
239 if b.id >= 0 {
kesavandc71914f2022-03-25 11:19:03 +0530240 DebugLogger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
kesavand2cde6582020-06-22 04:56:23 -0400241 } else {
kesavandc71914f2022-03-25 11:19:03 +0530242 DebugLogger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
kesavand2cde6582020-06-22 04:56:23 -0400243 }
244 go withRecover(b.responseReceiver)
245 })
246
247 return nil
248}
249
250// Connected returns true if the broker is connected and false otherwise. If the broker is not
251// connected but it had tried to connect, the error from that connection attempt is also returned.
252func (b *Broker) Connected() (bool, error) {
253 b.lock.Lock()
254 defer b.lock.Unlock()
255
256 return b.conn != nil, b.connErr
257}
258
kesavandc71914f2022-03-25 11:19:03 +0530259// TLSConnectionState returns the client's TLS connection state. The second return value is false if this is not a tls connection or the connection has not yet been established.
260func (b *Broker) TLSConnectionState() (state tls.ConnectionState, ok bool) {
261 b.lock.Lock()
262 defer b.lock.Unlock()
263
264 if b.conn == nil {
265 return state, false
266 }
267 conn := b.conn
268 if bconn, ok := b.conn.(*bufConn); ok {
269 conn = bconn.Conn
270 }
271 if tc, ok := conn.(*tls.Conn); ok {
272 return tc.ConnectionState(), true
273 }
274 return state, false
275}
276
277// Close closes the broker resources
kesavand2cde6582020-06-22 04:56:23 -0400278func (b *Broker) Close() error {
279 b.lock.Lock()
280 defer b.lock.Unlock()
281
282 if b.conn == nil {
283 return ErrNotConnected
284 }
285
286 close(b.responses)
287 <-b.done
288
289 err := b.conn.Close()
290
291 b.conn = nil
292 b.connErr = nil
293 b.done = nil
294 b.responses = nil
295
296 b.unregisterMetrics()
297
298 if err == nil {
kesavandc71914f2022-03-25 11:19:03 +0530299 DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
kesavand2cde6582020-06-22 04:56:23 -0400300 } else {
301 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
302 }
303
304 atomic.StoreInt32(&b.opened, 0)
305
306 return err
307}
308
309// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
310func (b *Broker) ID() int32 {
311 return b.id
312}
313
314// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
315func (b *Broker) Addr() string {
316 return b.addr
317}
318
319// Rack returns the broker's rack as retrieved from Kafka's metadata or the
320// empty string if it is not known. The returned value corresponds to the
321// broker's broker.rack configuration setting. Requires protocol version to be
322// at least v0.10.0.0.
323func (b *Broker) Rack() string {
324 if b.rack == nil {
325 return ""
326 }
327 return *b.rack
328}
329
kesavandc71914f2022-03-25 11:19:03 +0530330// GetMetadata send a metadata request and returns a metadata response or error
kesavand2cde6582020-06-22 04:56:23 -0400331func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
332 response := new(MetadataResponse)
333
334 err := b.sendAndReceive(request, response)
kesavand2cde6582020-06-22 04:56:23 -0400335 if err != nil {
336 return nil, err
337 }
338
339 return response, nil
340}
341
kesavandc71914f2022-03-25 11:19:03 +0530342// GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
kesavand2cde6582020-06-22 04:56:23 -0400343func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
344 response := new(ConsumerMetadataResponse)
345
346 err := b.sendAndReceive(request, response)
kesavand2cde6582020-06-22 04:56:23 -0400347 if err != nil {
348 return nil, err
349 }
350
351 return response, nil
352}
353
kesavandc71914f2022-03-25 11:19:03 +0530354// FindCoordinator sends a find coordinate request and returns a response or error
kesavand2cde6582020-06-22 04:56:23 -0400355func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
356 response := new(FindCoordinatorResponse)
357
358 err := b.sendAndReceive(request, response)
kesavand2cde6582020-06-22 04:56:23 -0400359 if err != nil {
360 return nil, err
361 }
362
363 return response, nil
364}
365
kesavandc71914f2022-03-25 11:19:03 +0530366// GetAvailableOffsets return an offset response or error
kesavand2cde6582020-06-22 04:56:23 -0400367func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
368 response := new(OffsetResponse)
369
370 err := b.sendAndReceive(request, response)
kesavand2cde6582020-06-22 04:56:23 -0400371 if err != nil {
372 return nil, err
373 }
374
375 return response, nil
376}
377
kesavandc71914f2022-03-25 11:19:03 +0530378// ProduceCallback function is called once the produce response has been parsed
379// or could not be read.
380type ProduceCallback func(*ProduceResponse, error)
381
382// AsyncProduce sends a produce request and eventually call the provided callback
383// with a produce response or an error.
384//
385// Waiting for the response is generally not blocking on the contrary to using Produce.
386// If the maximum number of in flight request configured is reached then
387// the request will be blocked till a previous response is received.
388//
389// When configured with RequiredAcks == NoResponse, the callback will not be invoked.
390// If an error is returned because the request could not be sent then the callback
391// will not be invoked either.
392//
393// Make sure not to Close the broker in the callback as it will lead to a deadlock.
394func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error {
395 needAcks := request.RequiredAcks != NoResponse
396 // Use a nil promise when no acks is required
397 var promise *responsePromise
398
399 if needAcks {
400 // Create ProduceResponse early to provide the header version
401 res := new(ProduceResponse)
402 promise = &responsePromise{
403 headerVersion: res.headerVersion(),
404 // Packets will be converted to a ProduceResponse in the responseReceiver goroutine
405 handler: func(packets []byte, err error) {
406 if err != nil {
407 // Failed request
408 cb(nil, err)
409 return
410 }
411
412 if err := versionedDecode(packets, res, request.version()); err != nil {
413 // Malformed response
414 cb(nil, err)
415 return
416 }
417
418 // Wellformed response
419 b.updateThrottleMetric(res.ThrottleTime)
420 cb(res, nil)
421 },
422 }
423 }
424
425 return b.sendWithPromise(request, promise)
426}
427
kesavand2cde6582020-06-22 04:56:23 -0400428//Produce returns a produce response or error
429func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
430 var (
431 response *ProduceResponse
432 err error
433 )
434
435 if request.RequiredAcks == NoResponse {
436 err = b.sendAndReceive(request, nil)
437 } else {
438 response = new(ProduceResponse)
439 err = b.sendAndReceive(request, response)
kesavandc71914f2022-03-25 11:19:03 +0530440 b.updateThrottleMetric(response.ThrottleTime)
kesavand2cde6582020-06-22 04:56:23 -0400441 }
442
443 if err != nil {
444 return nil, err
445 }
446
447 return response, nil
448}
449
kesavandc71914f2022-03-25 11:19:03 +0530450// Fetch returns a FetchResponse or error
kesavand2cde6582020-06-22 04:56:23 -0400451func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
452 response := new(FetchResponse)
453
454 err := b.sendAndReceive(request, response)
455 if err != nil {
456 return nil, err
457 }
458
459 return response, nil
460}
461
kesavandc71914f2022-03-25 11:19:03 +0530462// CommitOffset return an Offset commit response or error
kesavand2cde6582020-06-22 04:56:23 -0400463func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
464 response := new(OffsetCommitResponse)
465
466 err := b.sendAndReceive(request, response)
467 if err != nil {
468 return nil, err
469 }
470
471 return response, nil
472}
473
kesavandc71914f2022-03-25 11:19:03 +0530474// FetchOffset returns an offset fetch response or error
kesavand2cde6582020-06-22 04:56:23 -0400475func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
476 response := new(OffsetFetchResponse)
kesavandc71914f2022-03-25 11:19:03 +0530477 response.Version = request.Version // needed to handle the two header versions
kesavand2cde6582020-06-22 04:56:23 -0400478
479 err := b.sendAndReceive(request, response)
480 if err != nil {
481 return nil, err
482 }
483
484 return response, nil
485}
486
kesavandc71914f2022-03-25 11:19:03 +0530487// JoinGroup returns a join group response or error
kesavand2cde6582020-06-22 04:56:23 -0400488func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
489 response := new(JoinGroupResponse)
490
491 err := b.sendAndReceive(request, response)
492 if err != nil {
493 return nil, err
494 }
495
496 return response, nil
497}
498
kesavandc71914f2022-03-25 11:19:03 +0530499// SyncGroup returns a sync group response or error
kesavand2cde6582020-06-22 04:56:23 -0400500func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
501 response := new(SyncGroupResponse)
502
503 err := b.sendAndReceive(request, response)
504 if err != nil {
505 return nil, err
506 }
507
508 return response, nil
509}
510
kesavandc71914f2022-03-25 11:19:03 +0530511// LeaveGroup return a leave group response or error
kesavand2cde6582020-06-22 04:56:23 -0400512func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
513 response := new(LeaveGroupResponse)
514
515 err := b.sendAndReceive(request, response)
516 if err != nil {
517 return nil, err
518 }
519
520 return response, nil
521}
522
kesavandc71914f2022-03-25 11:19:03 +0530523// Heartbeat returns a heartbeat response or error
kesavand2cde6582020-06-22 04:56:23 -0400524func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
525 response := new(HeartbeatResponse)
526
527 err := b.sendAndReceive(request, response)
528 if err != nil {
529 return nil, err
530 }
531
532 return response, nil
533}
534
kesavandc71914f2022-03-25 11:19:03 +0530535// ListGroups return a list group response or error
kesavand2cde6582020-06-22 04:56:23 -0400536func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
537 response := new(ListGroupsResponse)
538
539 err := b.sendAndReceive(request, response)
540 if err != nil {
541 return nil, err
542 }
543
544 return response, nil
545}
546
kesavandc71914f2022-03-25 11:19:03 +0530547// DescribeGroups return describe group response or error
kesavand2cde6582020-06-22 04:56:23 -0400548func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
549 response := new(DescribeGroupsResponse)
550
551 err := b.sendAndReceive(request, response)
552 if err != nil {
553 return nil, err
554 }
555
556 return response, nil
557}
558
kesavandc71914f2022-03-25 11:19:03 +0530559// ApiVersions return api version response or error
kesavand2cde6582020-06-22 04:56:23 -0400560func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
561 response := new(ApiVersionsResponse)
562
563 err := b.sendAndReceive(request, response)
564 if err != nil {
565 return nil, err
566 }
567
568 return response, nil
569}
570
kesavandc71914f2022-03-25 11:19:03 +0530571// CreateTopics send a create topic request and returns create topic response
kesavand2cde6582020-06-22 04:56:23 -0400572func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
573 response := new(CreateTopicsResponse)
574
575 err := b.sendAndReceive(request, response)
576 if err != nil {
577 return nil, err
578 }
579
580 return response, nil
581}
582
kesavandc71914f2022-03-25 11:19:03 +0530583// DeleteTopics sends a delete topic request and returns delete topic response
kesavand2cde6582020-06-22 04:56:23 -0400584func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
585 response := new(DeleteTopicsResponse)
586
587 err := b.sendAndReceive(request, response)
588 if err != nil {
589 return nil, err
590 }
591
592 return response, nil
593}
594
kesavandc71914f2022-03-25 11:19:03 +0530595// CreatePartitions sends a create partition request and returns create
596// partitions response or error
kesavand2cde6582020-06-22 04:56:23 -0400597func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
598 response := new(CreatePartitionsResponse)
599
600 err := b.sendAndReceive(request, response)
601 if err != nil {
602 return nil, err
603 }
604
605 return response, nil
606}
607
kesavandc71914f2022-03-25 11:19:03 +0530608// AlterPartitionReassignments sends a alter partition reassignments request and
609// returns alter partition reassignments response
610func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) {
611 response := new(AlterPartitionReassignmentsResponse)
612
613 err := b.sendAndReceive(request, response)
614 if err != nil {
615 return nil, err
616 }
617
618 return response, nil
619}
620
621// ListPartitionReassignments sends a list partition reassignments request and
622// returns list partition reassignments response
623func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
624 response := new(ListPartitionReassignmentsResponse)
625
626 err := b.sendAndReceive(request, response)
627 if err != nil {
628 return nil, err
629 }
630
631 return response, nil
632}
633
634// DeleteRecords send a request to delete records and return delete record
635// response or error
kesavand2cde6582020-06-22 04:56:23 -0400636func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
637 response := new(DeleteRecordsResponse)
638
639 err := b.sendAndReceive(request, response)
640 if err != nil {
641 return nil, err
642 }
643
644 return response, nil
645}
646
kesavandc71914f2022-03-25 11:19:03 +0530647// DescribeAcls sends a describe acl request and returns a response or error
kesavand2cde6582020-06-22 04:56:23 -0400648func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
649 response := new(DescribeAclsResponse)
650
651 err := b.sendAndReceive(request, response)
652 if err != nil {
653 return nil, err
654 }
655
656 return response, nil
657}
658
kesavandc71914f2022-03-25 11:19:03 +0530659// CreateAcls sends a create acl request and returns a response or error
kesavand2cde6582020-06-22 04:56:23 -0400660func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
661 response := new(CreateAclsResponse)
662
663 err := b.sendAndReceive(request, response)
664 if err != nil {
665 return nil, err
666 }
667
668 return response, nil
669}
670
kesavandc71914f2022-03-25 11:19:03 +0530671// DeleteAcls sends a delete acl request and returns a response or error
kesavand2cde6582020-06-22 04:56:23 -0400672func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
673 response := new(DeleteAclsResponse)
674
675 err := b.sendAndReceive(request, response)
676 if err != nil {
677 return nil, err
678 }
679
680 return response, nil
681}
682
kesavandc71914f2022-03-25 11:19:03 +0530683// InitProducerID sends an init producer request and returns a response or error
kesavand2cde6582020-06-22 04:56:23 -0400684func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
685 response := new(InitProducerIDResponse)
686
687 err := b.sendAndReceive(request, response)
688 if err != nil {
689 return nil, err
690 }
691
692 return response, nil
693}
694
kesavandc71914f2022-03-25 11:19:03 +0530695// AddPartitionsToTxn send a request to add partition to txn and returns
696// a response or error
kesavand2cde6582020-06-22 04:56:23 -0400697func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
698 response := new(AddPartitionsToTxnResponse)
699
700 err := b.sendAndReceive(request, response)
701 if err != nil {
702 return nil, err
703 }
704
705 return response, nil
706}
707
kesavandc71914f2022-03-25 11:19:03 +0530708// AddOffsetsToTxn sends a request to add offsets to txn and returns a response
709// or error
kesavand2cde6582020-06-22 04:56:23 -0400710func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
711 response := new(AddOffsetsToTxnResponse)
712
713 err := b.sendAndReceive(request, response)
714 if err != nil {
715 return nil, err
716 }
717
718 return response, nil
719}
720
kesavandc71914f2022-03-25 11:19:03 +0530721// EndTxn sends a request to end txn and returns a response or error
kesavand2cde6582020-06-22 04:56:23 -0400722func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
723 response := new(EndTxnResponse)
724
725 err := b.sendAndReceive(request, response)
726 if err != nil {
727 return nil, err
728 }
729
730 return response, nil
731}
732
kesavandc71914f2022-03-25 11:19:03 +0530733// TxnOffsetCommit sends a request to commit transaction offsets and returns
734// a response or error
kesavand2cde6582020-06-22 04:56:23 -0400735func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
736 response := new(TxnOffsetCommitResponse)
737
738 err := b.sendAndReceive(request, response)
739 if err != nil {
740 return nil, err
741 }
742
743 return response, nil
744}
745
kesavandc71914f2022-03-25 11:19:03 +0530746// DescribeConfigs sends a request to describe config and returns a response or
747// error
kesavand2cde6582020-06-22 04:56:23 -0400748func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
749 response := new(DescribeConfigsResponse)
750
751 err := b.sendAndReceive(request, response)
752 if err != nil {
753 return nil, err
754 }
755
756 return response, nil
757}
758
kesavandc71914f2022-03-25 11:19:03 +0530759// AlterConfigs sends a request to alter config and return a response or error
kesavand2cde6582020-06-22 04:56:23 -0400760func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
761 response := new(AlterConfigsResponse)
762
763 err := b.sendAndReceive(request, response)
764 if err != nil {
765 return nil, err
766 }
767
768 return response, nil
769}
770
kesavandc71914f2022-03-25 11:19:03 +0530771// IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
772func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
773 response := new(IncrementalAlterConfigsResponse)
774
775 err := b.sendAndReceive(request, response)
776 if err != nil {
777 return nil, err
778 }
779
780 return response, nil
781}
782
783// DeleteGroups sends a request to delete groups and returns a response or error
kesavand2cde6582020-06-22 04:56:23 -0400784func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
785 response := new(DeleteGroupsResponse)
786
787 if err := b.sendAndReceive(request, response); err != nil {
788 return nil, err
789 }
790
791 return response, nil
792}
793
kesavandc71914f2022-03-25 11:19:03 +0530794// DeleteOffsets sends a request to delete group offsets and returns a response or error
795func (b *Broker) DeleteOffsets(request *DeleteOffsetsRequest) (*DeleteOffsetsResponse, error) {
796 response := new(DeleteOffsetsResponse)
797
798 if err := b.sendAndReceive(request, response); err != nil {
799 return nil, err
800 }
801
802 return response, nil
803}
804
805// DescribeLogDirs sends a request to get the broker's log dir paths and sizes
806func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
807 response := new(DescribeLogDirsResponse)
808
809 err := b.sendAndReceive(request, response)
810 if err != nil {
811 return nil, err
812 }
813
814 return response, nil
815}
816
817// DescribeUserScramCredentials sends a request to get SCRAM users
818func (b *Broker) DescribeUserScramCredentials(req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
819 res := new(DescribeUserScramCredentialsResponse)
820
821 err := b.sendAndReceive(req, res)
822 if err != nil {
823 return nil, err
824 }
825
826 return res, err
827}
828
829func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
830 res := new(AlterUserScramCredentialsResponse)
831
832 err := b.sendAndReceive(req, res)
833 if err != nil {
834 return nil, err
835 }
836
837 return res, nil
838}
839
840// DescribeClientQuotas sends a request to get the broker's quotas
841func (b *Broker) DescribeClientQuotas(request *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) {
842 response := new(DescribeClientQuotasResponse)
843
844 err := b.sendAndReceive(request, response)
845 if err != nil {
846 return nil, err
847 }
848
849 return response, nil
850}
851
852// AlterClientQuotas sends a request to alter the broker's quotas
853func (b *Broker) AlterClientQuotas(request *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
854 response := new(AlterClientQuotasResponse)
855
856 err := b.sendAndReceive(request, response)
857 if err != nil {
858 return nil, err
859 }
860
861 return response, nil
862}
863
864// readFull ensures the conn ReadDeadline has been setup before making a
865// call to io.ReadFull
866func (b *Broker) readFull(buf []byte) (n int, err error) {
867 if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
868 return 0, err
869 }
870
871 return io.ReadFull(b.conn, buf)
872}
873
874// write ensures the conn WriteDeadline has been setup before making a
875// call to conn.Write
876func (b *Broker) write(buf []byte) (n int, err error) {
877 if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
878 return 0, err
879 }
880
881 return b.conn.Write(buf)
882}
883
884func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
885 var promise *responsePromise
886 if promiseResponse {
887 // Packets or error will be sent to the following channels
888 // once the response is received
889 promise = &responsePromise{
890 headerVersion: responseHeaderVersion,
891 packets: make(chan []byte),
892 errors: make(chan error),
893 }
894 }
895
896 if err := b.sendWithPromise(rb, promise); err != nil {
897 return nil, err
898 }
899
900 return promise, nil
901}
902
903func (b *Broker) sendWithPromise(rb protocolBody, promise *responsePromise) error {
kesavand2cde6582020-06-22 04:56:23 -0400904 b.lock.Lock()
905 defer b.lock.Unlock()
906
907 if b.conn == nil {
908 if b.connErr != nil {
kesavandc71914f2022-03-25 11:19:03 +0530909 return b.connErr
kesavand2cde6582020-06-22 04:56:23 -0400910 }
kesavandc71914f2022-03-25 11:19:03 +0530911 return ErrNotConnected
kesavand2cde6582020-06-22 04:56:23 -0400912 }
913
914 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
kesavandc71914f2022-03-25 11:19:03 +0530915 return ErrUnsupportedVersion
kesavand2cde6582020-06-22 04:56:23 -0400916 }
917
918 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
919 buf, err := encode(req, b.conf.MetricRegistry)
920 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +0530921 return err
kesavand2cde6582020-06-22 04:56:23 -0400922 }
923
924 requestTime := time.Now()
kesavandc71914f2022-03-25 11:19:03 +0530925 // Will be decremented in responseReceiver (except error or request with NoResponse)
926 b.addRequestInFlightMetrics(1)
927 bytes, err := b.write(buf)
928 b.updateOutgoingCommunicationMetrics(bytes)
kesavand2cde6582020-06-22 04:56:23 -0400929 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +0530930 b.addRequestInFlightMetrics(-1)
931 return err
kesavand2cde6582020-06-22 04:56:23 -0400932 }
933 b.correlationID++
934
kesavandc71914f2022-03-25 11:19:03 +0530935 if promise == nil {
kesavand2cde6582020-06-22 04:56:23 -0400936 // Record request latency without the response
kesavandc71914f2022-03-25 11:19:03 +0530937 b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
938 return nil
kesavand2cde6582020-06-22 04:56:23 -0400939 }
940
kesavandc71914f2022-03-25 11:19:03 +0530941 promise.requestTime = requestTime
942 promise.correlationID = req.correlationID
kesavand2cde6582020-06-22 04:56:23 -0400943 b.responses <- promise
944
kesavandc71914f2022-03-25 11:19:03 +0530945 return nil
kesavand2cde6582020-06-22 04:56:23 -0400946}
947
kesavandc71914f2022-03-25 11:19:03 +0530948func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
949 responseHeaderVersion := int16(-1)
950 if res != nil {
951 responseHeaderVersion = res.headerVersion()
952 }
953
954 promise, err := b.send(req, res != nil, responseHeaderVersion)
kesavand2cde6582020-06-22 04:56:23 -0400955 if err != nil {
956 return err
957 }
958
959 if promise == nil {
960 return nil
961 }
962
963 select {
964 case buf := <-promise.packets:
965 return versionedDecode(buf, res, req.version())
966 case err = <-promise.errors:
967 return err
968 }
969}
970
971func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
972 b.id, err = pd.getInt32()
973 if err != nil {
974 return err
975 }
976
977 host, err := pd.getString()
978 if err != nil {
979 return err
980 }
981
982 port, err := pd.getInt32()
983 if err != nil {
984 return err
985 }
986
987 if version >= 1 {
988 b.rack, err = pd.getNullableString()
989 if err != nil {
990 return err
991 }
992 }
993
994 b.addr = net.JoinHostPort(host, fmt.Sprint(port))
995 if _, _, err := net.SplitHostPort(b.addr); err != nil {
996 return err
997 }
998
999 return nil
1000}
1001
1002func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
1003 host, portstr, err := net.SplitHostPort(b.addr)
1004 if err != nil {
1005 return err
1006 }
1007
kesavandc71914f2022-03-25 11:19:03 +05301008 port, err := strconv.ParseInt(portstr, 10, 32)
kesavand2cde6582020-06-22 04:56:23 -04001009 if err != nil {
1010 return err
1011 }
1012
1013 pe.putInt32(b.id)
1014
1015 err = pe.putString(host)
1016 if err != nil {
1017 return err
1018 }
1019
1020 pe.putInt32(int32(port))
1021
1022 if version >= 1 {
1023 err = pe.putNullableString(b.rack)
1024 if err != nil {
1025 return err
1026 }
1027 }
1028
1029 return nil
1030}
1031
1032func (b *Broker) responseReceiver() {
1033 var dead error
kesavand2cde6582020-06-22 04:56:23 -04001034
1035 for response := range b.responses {
1036 if dead != nil {
kesavandc71914f2022-03-25 11:19:03 +05301037 // This was previously incremented in send() and
1038 // we are not calling updateIncomingCommunicationMetrics()
1039 b.addRequestInFlightMetrics(-1)
1040 response.handle(nil, dead)
kesavand2cde6582020-06-22 04:56:23 -04001041 continue
1042 }
1043
kesavandc71914f2022-03-25 11:19:03 +05301044 headerLength := getHeaderLength(response.headerVersion)
1045 header := make([]byte, headerLength)
kesavand2cde6582020-06-22 04:56:23 -04001046
kesavandc71914f2022-03-25 11:19:03 +05301047 bytesReadHeader, err := b.readFull(header)
kesavand2cde6582020-06-22 04:56:23 -04001048 requestLatency := time.Since(response.requestTime)
1049 if err != nil {
1050 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
1051 dead = err
kesavandc71914f2022-03-25 11:19:03 +05301052 response.handle(nil, err)
kesavand2cde6582020-06-22 04:56:23 -04001053 continue
1054 }
1055
1056 decodedHeader := responseHeader{}
kesavandc71914f2022-03-25 11:19:03 +05301057 err = versionedDecode(header, &decodedHeader, response.headerVersion)
kesavand2cde6582020-06-22 04:56:23 -04001058 if err != nil {
1059 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
1060 dead = err
kesavandc71914f2022-03-25 11:19:03 +05301061 response.handle(nil, err)
kesavand2cde6582020-06-22 04:56:23 -04001062 continue
1063 }
1064 if decodedHeader.correlationID != response.correlationID {
1065 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
1066 // TODO if decoded ID < cur ID, discard until we catch up
1067 // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
1068 dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
kesavandc71914f2022-03-25 11:19:03 +05301069 response.handle(nil, dead)
kesavand2cde6582020-06-22 04:56:23 -04001070 continue
1071 }
1072
kesavandc71914f2022-03-25 11:19:03 +05301073 buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
1074 bytesReadBody, err := b.readFull(buf)
kesavand2cde6582020-06-22 04:56:23 -04001075 b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
1076 if err != nil {
1077 dead = err
kesavandc71914f2022-03-25 11:19:03 +05301078 response.handle(nil, err)
kesavand2cde6582020-06-22 04:56:23 -04001079 continue
1080 }
1081
kesavandc71914f2022-03-25 11:19:03 +05301082 response.handle(buf, nil)
kesavand2cde6582020-06-22 04:56:23 -04001083 }
1084 close(b.done)
1085}
1086
kesavandc71914f2022-03-25 11:19:03 +05301087func getHeaderLength(headerVersion int16) int8 {
1088 if headerVersion < 1 {
1089 return 8
1090 } else {
1091 // header contains additional tagged field length (0), we don't support actual tags yet.
1092 return 9
1093 }
1094}
1095
kesavand2cde6582020-06-22 04:56:23 -04001096func (b *Broker) authenticateViaSASL() error {
1097 switch b.conf.Net.SASL.Mechanism {
1098 case SASLTypeOAuth:
1099 return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
1100 case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
kesavandc71914f2022-03-25 11:19:03 +05301101 return b.sendAndReceiveSASLSCRAM()
kesavand2cde6582020-06-22 04:56:23 -04001102 case SASLTypeGSSAPI:
1103 return b.sendAndReceiveKerberos()
1104 default:
1105 return b.sendAndReceiveSASLPlainAuth()
1106 }
1107}
1108
1109func (b *Broker) sendAndReceiveKerberos() error {
1110 b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
1111 if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
1112 b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
1113 }
1114 return b.kerberosAuthenticator.Authorize(b)
1115}
1116
1117func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
1118 rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
1119
1120 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
1121 buf, err := encode(req, b.conf.MetricRegistry)
1122 if err != nil {
1123 return err
1124 }
1125
kesavand2cde6582020-06-22 04:56:23 -04001126 requestTime := time.Now()
kesavandc71914f2022-03-25 11:19:03 +05301127 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1128 b.addRequestInFlightMetrics(1)
1129 bytes, err := b.write(buf)
kesavand2cde6582020-06-22 04:56:23 -04001130 b.updateOutgoingCommunicationMetrics(bytes)
1131 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +05301132 b.addRequestInFlightMetrics(-1)
kesavand2cde6582020-06-22 04:56:23 -04001133 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
1134 return err
1135 }
1136 b.correlationID++
kesavandc71914f2022-03-25 11:19:03 +05301137
kesavand2cde6582020-06-22 04:56:23 -04001138 header := make([]byte, 8) // response header
kesavandc71914f2022-03-25 11:19:03 +05301139 _, err = b.readFull(header)
kesavand2cde6582020-06-22 04:56:23 -04001140 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +05301141 b.addRequestInFlightMetrics(-1)
kesavand2cde6582020-06-22 04:56:23 -04001142 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
1143 return err
1144 }
1145
1146 length := binary.BigEndian.Uint32(header[:4])
1147 payload := make([]byte, length-4)
kesavandc71914f2022-03-25 11:19:03 +05301148 n, err := b.readFull(payload)
kesavand2cde6582020-06-22 04:56:23 -04001149 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +05301150 b.addRequestInFlightMetrics(-1)
kesavand2cde6582020-06-22 04:56:23 -04001151 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
1152 return err
1153 }
1154
1155 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
1156 res := &SaslHandshakeResponse{}
1157
1158 err = versionedDecode(payload, res, 0)
1159 if err != nil {
1160 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
1161 return err
1162 }
1163
1164 if res.Err != ErrNoError {
1165 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
1166 return res.Err
1167 }
1168
kesavandc71914f2022-03-25 11:19:03 +05301169 DebugLogger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
kesavand2cde6582020-06-22 04:56:23 -04001170 return nil
1171}
1172
1173// Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
1174// Kafka 1.x.x onward added a SaslAuthenticate request/response message which
1175// wraps the SASL flow in the Kafka protocol, which allows for returning
1176// meaningful errors on authentication failure.
1177//
1178// In SASL Plain, Kafka expects the auth header to be in the following format
1179// Message format (from https://tools.ietf.org/html/rfc4616):
1180//
1181// message = [authzid] UTF8NUL authcid UTF8NUL passwd
1182// authcid = 1*SAFE ; MUST accept up to 255 octets
1183// authzid = 1*SAFE ; MUST accept up to 255 octets
1184// passwd = 1*SAFE ; MUST accept up to 255 octets
1185// UTF8NUL = %x00 ; UTF-8 encoded NUL character
1186//
1187// SAFE = UTF1 / UTF2 / UTF3 / UTF4
1188// ;; any UTF-8 encoded Unicode character except NUL
1189//
1190// With SASL v0 handshake and auth then:
1191// When credentials are valid, Kafka returns a 4 byte array of null characters.
1192// When credentials are invalid, Kafka closes the connection.
1193//
1194// With SASL v1 handshake and auth then:
1195// When credentials are invalid, Kafka replies with a SaslAuthenticate response
1196// containing an error code and message detailing the authentication failure.
1197func (b *Broker) sendAndReceiveSASLPlainAuth() error {
kesavandc71914f2022-03-25 11:19:03 +05301198 // default to V0 to allow for backward compatibility when SASL is enabled
kesavand2cde6582020-06-22 04:56:23 -04001199 // but not the handshake
1200 if b.conf.Net.SASL.Handshake {
kesavand2cde6582020-06-22 04:56:23 -04001201 handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
1202 if handshakeErr != nil {
1203 Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
1204 return handshakeErr
1205 }
1206 }
1207
1208 if b.conf.Net.SASL.Version == SASLHandshakeV1 {
1209 return b.sendAndReceiveV1SASLPlainAuth()
1210 }
1211 return b.sendAndReceiveV0SASLPlainAuth()
1212}
1213
1214// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
1215func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
kesavandc71914f2022-03-25 11:19:03 +05301216 length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
1217 authBytes := make([]byte, length+4) // 4 byte length header + auth data
kesavand2cde6582020-06-22 04:56:23 -04001218 binary.BigEndian.PutUint32(authBytes, uint32(length))
kesavandc71914f2022-03-25 11:19:03 +05301219 copy(authBytes[4:], b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)
kesavand2cde6582020-06-22 04:56:23 -04001220
1221 requestTime := time.Now()
kesavandc71914f2022-03-25 11:19:03 +05301222 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1223 b.addRequestInFlightMetrics(1)
1224 bytesWritten, err := b.write(authBytes)
kesavand2cde6582020-06-22 04:56:23 -04001225 b.updateOutgoingCommunicationMetrics(bytesWritten)
1226 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +05301227 b.addRequestInFlightMetrics(-1)
kesavand2cde6582020-06-22 04:56:23 -04001228 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1229 return err
1230 }
1231
1232 header := make([]byte, 4)
kesavandc71914f2022-03-25 11:19:03 +05301233 n, err := b.readFull(header)
kesavand2cde6582020-06-22 04:56:23 -04001234 b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
1235 // If the credentials are valid, we would get a 4 byte response filled with null characters.
1236 // Otherwise, the broker closes the connection and we get an EOF
1237 if err != nil {
1238 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1239 return err
1240 }
1241
kesavandc71914f2022-03-25 11:19:03 +05301242 DebugLogger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
kesavand2cde6582020-06-22 04:56:23 -04001243 return nil
1244}
1245
1246// sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
1247func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
1248 correlationID := b.correlationID
1249
1250 requestTime := time.Now()
1251
kesavandc71914f2022-03-25 11:19:03 +05301252 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1253 b.addRequestInFlightMetrics(1)
kesavand2cde6582020-06-22 04:56:23 -04001254 bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
kesavand2cde6582020-06-22 04:56:23 -04001255 b.updateOutgoingCommunicationMetrics(bytesWritten)
1256
1257 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +05301258 b.addRequestInFlightMetrics(-1)
kesavand2cde6582020-06-22 04:56:23 -04001259 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1260 return err
1261 }
1262
1263 b.correlationID++
1264
kesavandc71914f2022-03-25 11:19:03 +05301265 bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
kesavand2cde6582020-06-22 04:56:23 -04001266 b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
1267
1268 // With v1 sasl we get an error message set in the response we can return
1269 if err != nil {
1270 Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
1271 return err
1272 }
1273
1274 return nil
1275}
1276
1277// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
1278// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
1279func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
1280 if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
1281 return err
1282 }
1283
1284 token, err := provider.Token()
1285 if err != nil {
1286 return err
1287 }
1288
kesavandc71914f2022-03-25 11:19:03 +05301289 message, err := buildClientFirstMessage(token)
1290 if err != nil {
1291 return err
1292 }
1293
1294 challenged, err := b.sendClientMessage(message)
1295 if err != nil {
1296 return err
1297 }
1298
1299 if challenged {
1300 // Abort the token exchange. The broker returns the failure code.
1301 _, err = b.sendClientMessage([]byte(`\x01`))
1302 }
1303
1304 return err
1305}
1306
1307// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
1308// if the broker responds with a challenge, in which case the token is
1309// rejected.
1310func (b *Broker) sendClientMessage(message []byte) (bool, error) {
kesavand2cde6582020-06-22 04:56:23 -04001311 requestTime := time.Now()
kesavandc71914f2022-03-25 11:19:03 +05301312 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1313 b.addRequestInFlightMetrics(1)
kesavand2cde6582020-06-22 04:56:23 -04001314 correlationID := b.correlationID
1315
kesavandc71914f2022-03-25 11:19:03 +05301316 bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
1317 b.updateOutgoingCommunicationMetrics(bytesWritten)
kesavand2cde6582020-06-22 04:56:23 -04001318 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +05301319 b.addRequestInFlightMetrics(-1)
1320 return false, err
kesavand2cde6582020-06-22 04:56:23 -04001321 }
1322
kesavand2cde6582020-06-22 04:56:23 -04001323 b.correlationID++
1324
kesavandc71914f2022-03-25 11:19:03 +05301325 res := &SaslAuthenticateResponse{}
1326 bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
kesavand2cde6582020-06-22 04:56:23 -04001327
1328 requestLatency := time.Since(requestTime)
1329 b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
1330
kesavandc71914f2022-03-25 11:19:03 +05301331 isChallenge := len(res.SaslAuthBytes) > 0
1332
1333 if isChallenge && err != nil {
1334 Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
1335 }
1336
1337 return isChallenge, err
1338}
1339
1340func (b *Broker) sendAndReceiveSASLSCRAM() error {
1341 if b.conf.Net.SASL.Version == SASLHandshakeV0 {
1342 return b.sendAndReceiveSASLSCRAMv0()
1343 }
1344 return b.sendAndReceiveSASLSCRAMv1()
1345}
1346
1347func (b *Broker) sendAndReceiveSASLSCRAMv0() error {
1348 if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV0); err != nil {
1349 return err
1350 }
1351
1352 scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
1353 if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
1354 return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
1355 }
1356
1357 msg, err := scramClient.Step("")
1358 if err != nil {
1359 return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
1360 }
1361
1362 for !scramClient.Done() {
1363 requestTime := time.Now()
1364 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1365 b.addRequestInFlightMetrics(1)
1366 length := len(msg)
1367 authBytes := make([]byte, length+4) //4 byte length header + auth data
1368 binary.BigEndian.PutUint32(authBytes, uint32(length))
1369 copy(authBytes[4:], []byte(msg))
1370 _, err := b.write(authBytes)
1371 b.updateOutgoingCommunicationMetrics(length + 4)
1372 if err != nil {
1373 b.addRequestInFlightMetrics(-1)
1374 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1375 return err
1376 }
1377 b.correlationID++
1378 header := make([]byte, 4)
1379 _, err = b.readFull(header)
1380 if err != nil {
1381 b.addRequestInFlightMetrics(-1)
1382 Logger.Printf("Failed to read response header while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1383 return err
1384 }
1385 payload := make([]byte, int32(binary.BigEndian.Uint32(header)))
1386 n, err := b.readFull(payload)
1387 if err != nil {
1388 b.addRequestInFlightMetrics(-1)
1389 Logger.Printf("Failed to read response payload while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1390 return err
1391 }
1392 b.updateIncomingCommunicationMetrics(n+4, time.Since(requestTime))
1393 msg, err = scramClient.Step(string(payload))
1394 if err != nil {
1395 Logger.Println("SASL authentication failed", err)
1396 return err
1397 }
1398 }
1399
1400 DebugLogger.Println("SASL authentication succeeded")
kesavand2cde6582020-06-22 04:56:23 -04001401 return nil
1402}
1403
1404func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
1405 if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
1406 return err
1407 }
1408
1409 scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
1410 if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
1411 return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
1412 }
1413
1414 msg, err := scramClient.Step("")
1415 if err != nil {
1416 return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
kesavand2cde6582020-06-22 04:56:23 -04001417 }
1418
1419 for !scramClient.Done() {
1420 requestTime := time.Now()
kesavandc71914f2022-03-25 11:19:03 +05301421 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1422 b.addRequestInFlightMetrics(1)
kesavand2cde6582020-06-22 04:56:23 -04001423 correlationID := b.correlationID
1424 bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
kesavandc71914f2022-03-25 11:19:03 +05301425 b.updateOutgoingCommunicationMetrics(bytesWritten)
kesavand2cde6582020-06-22 04:56:23 -04001426 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +05301427 b.addRequestInFlightMetrics(-1)
kesavand2cde6582020-06-22 04:56:23 -04001428 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1429 return err
1430 }
1431
kesavand2cde6582020-06-22 04:56:23 -04001432 b.correlationID++
1433 challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
1434 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +05301435 b.addRequestInFlightMetrics(-1)
kesavand2cde6582020-06-22 04:56:23 -04001436 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1437 return err
1438 }
1439
1440 b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
1441 msg, err = scramClient.Step(string(challenge))
1442 if err != nil {
1443 Logger.Println("SASL authentication failed", err)
1444 return err
1445 }
1446 }
1447
kesavandc71914f2022-03-25 11:19:03 +05301448 DebugLogger.Println("SASL authentication succeeded")
kesavand2cde6582020-06-22 04:56:23 -04001449 return nil
1450}
1451
1452func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
1453 rb := &SaslAuthenticateRequest{msg}
1454 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1455 buf, err := encode(req, b.conf.MetricRegistry)
1456 if err != nil {
1457 return 0, err
1458 }
1459
kesavandc71914f2022-03-25 11:19:03 +05301460 return b.write(buf)
kesavand2cde6582020-06-22 04:56:23 -04001461}
1462
1463func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
1464 buf := make([]byte, responseLengthSize+correlationIDSize)
kesavandc71914f2022-03-25 11:19:03 +05301465 _, err := b.readFull(buf)
kesavand2cde6582020-06-22 04:56:23 -04001466 if err != nil {
1467 return nil, err
1468 }
1469
1470 header := responseHeader{}
kesavandc71914f2022-03-25 11:19:03 +05301471 err = versionedDecode(buf, &header, 0)
kesavand2cde6582020-06-22 04:56:23 -04001472 if err != nil {
1473 return nil, err
1474 }
1475
1476 if header.correlationID != correlationID {
1477 return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1478 }
1479
1480 buf = make([]byte, header.length-correlationIDSize)
kesavandc71914f2022-03-25 11:19:03 +05301481 _, err = b.readFull(buf)
kesavand2cde6582020-06-22 04:56:23 -04001482 if err != nil {
1483 return nil, err
1484 }
1485
1486 res := &SaslAuthenticateResponse{}
1487 if err := versionedDecode(buf, res, 0); err != nil {
1488 return nil, err
1489 }
1490 if res.Err != ErrNoError {
1491 return nil, res.Err
1492 }
1493 return res.SaslAuthBytes, nil
1494}
1495
1496// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
1497// https://tools.ietf.org/html/rfc7628
kesavandc71914f2022-03-25 11:19:03 +05301498func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
kesavand2cde6582020-06-22 04:56:23 -04001499 var ext string
1500
1501 if token.Extensions != nil && len(token.Extensions) > 0 {
1502 if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
1503 return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
1504 }
1505 ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
1506 }
1507
1508 resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
1509
1510 return resp, nil
1511}
1512
1513// mapToString returns a list of key-value pairs ordered by key.
1514// keyValSep separates the key from the value. elemSep separates each pair.
1515func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
1516 buf := make([]string, 0, len(extensions))
1517
1518 for k, v := range extensions {
1519 buf = append(buf, k+keyValSep+v)
1520 }
1521
1522 sort.Strings(buf)
1523
1524 return strings.Join(buf, elemSep)
1525}
1526
1527func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
kesavandc71914f2022-03-25 11:19:03 +05301528 authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
kesavand2cde6582020-06-22 04:56:23 -04001529 rb := &SaslAuthenticateRequest{authBytes}
1530 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1531 buf, err := encode(req, b.conf.MetricRegistry)
1532 if err != nil {
1533 return 0, err
1534 }
1535
kesavandc71914f2022-03-25 11:19:03 +05301536 return b.write(buf)
kesavand2cde6582020-06-22 04:56:23 -04001537}
1538
kesavandc71914f2022-03-25 11:19:03 +05301539func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
kesavand2cde6582020-06-22 04:56:23 -04001540 rb := &SaslAuthenticateRequest{initialResp}
1541
1542 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1543
1544 buf, err := encode(req, b.conf.MetricRegistry)
1545 if err != nil {
1546 return 0, err
1547 }
1548
kesavandc71914f2022-03-25 11:19:03 +05301549 return b.write(buf)
kesavand2cde6582020-06-22 04:56:23 -04001550}
1551
kesavandc71914f2022-03-25 11:19:03 +05301552func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
kesavand2cde6582020-06-22 04:56:23 -04001553 buf := make([]byte, responseLengthSize+correlationIDSize)
kesavandc71914f2022-03-25 11:19:03 +05301554 bytesRead, err := b.readFull(buf)
kesavand2cde6582020-06-22 04:56:23 -04001555 if err != nil {
1556 return bytesRead, err
1557 }
1558
1559 header := responseHeader{}
kesavandc71914f2022-03-25 11:19:03 +05301560 err = versionedDecode(buf, &header, 0)
kesavand2cde6582020-06-22 04:56:23 -04001561 if err != nil {
1562 return bytesRead, err
1563 }
1564
1565 if header.correlationID != correlationID {
1566 return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1567 }
1568
1569 buf = make([]byte, header.length-correlationIDSize)
kesavandc71914f2022-03-25 11:19:03 +05301570 c, err := b.readFull(buf)
kesavand2cde6582020-06-22 04:56:23 -04001571 bytesRead += c
1572 if err != nil {
1573 return bytesRead, err
1574 }
1575
kesavand2cde6582020-06-22 04:56:23 -04001576 if err := versionedDecode(buf, res, 0); err != nil {
1577 return bytesRead, err
1578 }
1579
1580 if res.Err != ErrNoError {
1581 return bytesRead, res.Err
1582 }
1583
kesavand2cde6582020-06-22 04:56:23 -04001584 return bytesRead, nil
1585}
1586
1587func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
kesavandc71914f2022-03-25 11:19:03 +05301588 b.updateRequestLatencyAndInFlightMetrics(requestLatency)
kesavand2cde6582020-06-22 04:56:23 -04001589 b.responseRate.Mark(1)
1590
1591 if b.brokerResponseRate != nil {
1592 b.brokerResponseRate.Mark(1)
1593 }
1594
1595 responseSize := int64(bytes)
1596 b.incomingByteRate.Mark(responseSize)
1597 if b.brokerIncomingByteRate != nil {
1598 b.brokerIncomingByteRate.Mark(responseSize)
1599 }
1600
1601 b.responseSize.Update(responseSize)
1602 if b.brokerResponseSize != nil {
1603 b.brokerResponseSize.Update(responseSize)
1604 }
1605}
1606
kesavandc71914f2022-03-25 11:19:03 +05301607func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
kesavand2cde6582020-06-22 04:56:23 -04001608 requestLatencyInMs := int64(requestLatency / time.Millisecond)
1609 b.requestLatency.Update(requestLatencyInMs)
1610
1611 if b.brokerRequestLatency != nil {
1612 b.brokerRequestLatency.Update(requestLatencyInMs)
1613 }
1614
kesavandc71914f2022-03-25 11:19:03 +05301615 b.addRequestInFlightMetrics(-1)
1616}
1617
1618func (b *Broker) addRequestInFlightMetrics(i int64) {
1619 b.requestsInFlight.Inc(i)
1620 if b.brokerRequestsInFlight != nil {
1621 b.brokerRequestsInFlight.Inc(i)
1622 }
kesavand2cde6582020-06-22 04:56:23 -04001623}
1624
1625func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
1626 b.requestRate.Mark(1)
1627 if b.brokerRequestRate != nil {
1628 b.brokerRequestRate.Mark(1)
1629 }
1630
1631 requestSize := int64(bytes)
1632 b.outgoingByteRate.Mark(requestSize)
1633 if b.brokerOutgoingByteRate != nil {
1634 b.brokerOutgoingByteRate.Mark(requestSize)
1635 }
1636
1637 b.requestSize.Update(requestSize)
1638 if b.brokerRequestSize != nil {
1639 b.brokerRequestSize.Update(requestSize)
1640 }
kesavandc71914f2022-03-25 11:19:03 +05301641}
kesavand2cde6582020-06-22 04:56:23 -04001642
kesavandc71914f2022-03-25 11:19:03 +05301643func (b *Broker) updateThrottleMetric(throttleTime time.Duration) {
1644 if throttleTime != time.Duration(0) {
1645 DebugLogger.Printf(
1646 "producer/broker/%d ProduceResponse throttled %v\n",
1647 b.ID(), throttleTime)
1648 if b.brokerThrottleTime != nil {
1649 throttleTimeInMs := int64(throttleTime / time.Millisecond)
1650 b.brokerThrottleTime.Update(throttleTimeInMs)
1651 }
1652 }
kesavand2cde6582020-06-22 04:56:23 -04001653}
1654
1655func (b *Broker) registerMetrics() {
1656 b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
1657 b.brokerRequestRate = b.registerMeter("request-rate")
1658 b.brokerRequestSize = b.registerHistogram("request-size")
1659 b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
1660 b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
1661 b.brokerResponseRate = b.registerMeter("response-rate")
1662 b.brokerResponseSize = b.registerHistogram("response-size")
kesavandc71914f2022-03-25 11:19:03 +05301663 b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
1664 b.brokerThrottleTime = b.registerHistogram("throttle-time-in-ms")
kesavand2cde6582020-06-22 04:56:23 -04001665}
1666
1667func (b *Broker) unregisterMetrics() {
1668 for _, name := range b.registeredMetrics {
1669 b.conf.MetricRegistry.Unregister(name)
1670 }
kesavandc71914f2022-03-25 11:19:03 +05301671 b.registeredMetrics = nil
kesavand2cde6582020-06-22 04:56:23 -04001672}
1673
1674func (b *Broker) registerMeter(name string) metrics.Meter {
1675 nameForBroker := getMetricNameForBroker(name, b)
1676 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1677 return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
1678}
1679
1680func (b *Broker) registerHistogram(name string) metrics.Histogram {
1681 nameForBroker := getMetricNameForBroker(name, b)
1682 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1683 return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
1684}
kesavandc71914f2022-03-25 11:19:03 +05301685
1686func (b *Broker) registerCounter(name string) metrics.Counter {
1687 nameForBroker := getMetricNameForBroker(name, b)
1688 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1689 return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
1690}
1691
1692func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config {
1693 if cfg == nil {
1694 cfg = &tls.Config{
1695 MinVersion: tls.VersionTLS12,
1696 }
1697 }
1698 if cfg.ServerName != "" {
1699 return cfg
1700 }
1701
1702 c := cfg.Clone()
1703 sn, _, err := net.SplitHostPort(addr)
1704 if err != nil {
1705 Logger.Println(fmt.Errorf("failed to get ServerName from addr %w", err))
1706 }
1707 c.ServerName = sn
1708 return c
1709}