blob: 7b32a03d3f9963be6c0178e8af91013bae49692c [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "crypto/tls"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "net"
9 "sort"
10 "strconv"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15
Abhilash S.L3b494632019-07-16 15:51:09 +053016 metrics "github.com/rcrowley/go-metrics"
William Kurkianea869482019-04-09 15:16:11 -040017)
18
19// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
20type Broker struct {
Abhilash S.L3b494632019-07-16 15:51:09 +053021 conf *Config
William Kurkianea869482019-04-09 15:16:11 -040022 rack *string
23
Abhilash S.L3b494632019-07-16 15:51:09 +053024 id int32
25 addr string
William Kurkianea869482019-04-09 15:16:11 -040026 correlationID int32
27 conn net.Conn
28 connErr error
29 lock sync.Mutex
30 opened int32
Abhilash S.L3b494632019-07-16 15:51:09 +053031 responses chan responsePromise
32 done chan bool
William Kurkianea869482019-04-09 15:16:11 -040033
Abhilash S.L3b494632019-07-16 15:51:09 +053034 registeredMetrics []string
William Kurkianea869482019-04-09 15:16:11 -040035
36 incomingByteRate metrics.Meter
37 requestRate metrics.Meter
38 requestSize metrics.Histogram
39 requestLatency metrics.Histogram
40 outgoingByteRate metrics.Meter
41 responseRate metrics.Meter
42 responseSize metrics.Histogram
43 brokerIncomingByteRate metrics.Meter
44 brokerRequestRate metrics.Meter
45 brokerRequestSize metrics.Histogram
46 brokerRequestLatency metrics.Histogram
47 brokerOutgoingByteRate metrics.Meter
48 brokerResponseRate metrics.Meter
49 brokerResponseSize metrics.Histogram
Abhilash S.L3b494632019-07-16 15:51:09 +053050
51 kerberosAuthenticator GSSAPIKerberosAuth
William Kurkianea869482019-04-09 15:16:11 -040052}
53
54// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
55type SASLMechanism string
56
57const (
58 // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
59 SASLTypeOAuth = "OAUTHBEARER"
60 // SASLTypePlaintext represents the SASL/PLAIN mechanism
61 SASLTypePlaintext = "PLAIN"
Abhilash S.L3b494632019-07-16 15:51:09 +053062 // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
63 SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
64 // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
65 SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
66 SASLTypeGSSAPI = "GSSAPI"
William Kurkianea869482019-04-09 15:16:11 -040067 // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
68 // server negotiate SASL auth using opaque packets.
69 SASLHandshakeV0 = int16(0)
70 // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
71 // server negotiate SASL by wrapping tokens with Kafka protocol headers.
72 SASLHandshakeV1 = int16(1)
73 // SASLExtKeyAuth is the reserved extension key name sent as part of the
74 // SASL/OAUTHBEARER intial client response
75 SASLExtKeyAuth = "auth"
76)
77
78// AccessToken contains an access token used to authenticate a
79// SASL/OAUTHBEARER client along with associated metadata.
80type AccessToken struct {
81 // Token is the access token payload.
82 Token string
83 // Extensions is a optional map of arbitrary key-value pairs that can be
84 // sent with the SASL/OAUTHBEARER initial client response. These values are
85 // ignored by the SASL server if they are unexpected. This feature is only
86 // supported by Kafka >= 2.1.0.
87 Extensions map[string]string
88}
89
90// AccessTokenProvider is the interface that encapsulates how implementors
91// can generate access tokens for Kafka broker authentication.
92type AccessTokenProvider interface {
93 // Token returns an access token. The implementation should ensure token
94 // reuse so that multiple calls at connect time do not create multiple
95 // tokens. The implementation should also periodically refresh the token in
96 // order to guarantee that each call returns an unexpired token. This
97 // method should not block indefinitely--a timeout error should be returned
98 // after a short period of inactivity so that the broker connection logic
99 // can log debugging information and retry.
100 Token() (*AccessToken, error)
101}
102
Abhilash S.L3b494632019-07-16 15:51:09 +0530103// SCRAMClient is a an interface to a SCRAM
104// client implementation.
105type SCRAMClient interface {
106 // Begin prepares the client for the SCRAM exchange
107 // with the server with a user name and a password
108 Begin(userName, password, authzID string) error
109 // Step steps client through the SCRAM exchange. It is
110 // called repeatedly until it errors or `Done` returns true.
111 Step(challenge string) (response string, err error)
112 // Done should return true when the SCRAM conversation
113 // is over.
114 Done() bool
115}
116
William Kurkianea869482019-04-09 15:16:11 -0400117type responsePromise struct {
118 requestTime time.Time
119 correlationID int32
120 packets chan []byte
121 errors chan error
122}
123
124// NewBroker creates and returns a Broker targeting the given host:port address.
125// This does not attempt to actually connect, you have to call Open() for that.
126func NewBroker(addr string) *Broker {
127 return &Broker{id: -1, addr: addr}
128}
129
130// Open tries to connect to the Broker if it is not already connected or connecting, but does not block
131// waiting for the connection to complete. This means that any subsequent operations on the broker will
132// block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
133// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
134// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
135func (b *Broker) Open(conf *Config) error {
136 if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
137 return ErrAlreadyConnected
138 }
139
140 if conf == nil {
141 conf = NewConfig()
142 }
143
144 err := conf.Validate()
145 if err != nil {
146 return err
147 }
148
149 b.lock.Lock()
150
151 go withRecover(func() {
152 defer b.lock.Unlock()
153
154 dialer := net.Dialer{
155 Timeout: conf.Net.DialTimeout,
156 KeepAlive: conf.Net.KeepAlive,
157 LocalAddr: conf.Net.LocalAddr,
158 }
159
160 if conf.Net.TLS.Enable {
161 b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
Abhilash S.L3b494632019-07-16 15:51:09 +0530162 } else if conf.Net.Proxy.Enable {
163 b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
William Kurkianea869482019-04-09 15:16:11 -0400164 } else {
165 b.conn, b.connErr = dialer.Dial("tcp", b.addr)
166 }
167 if b.connErr != nil {
168 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
169 b.conn = nil
170 atomic.StoreInt32(&b.opened, 0)
171 return
172 }
173 b.conn = newBufConn(b.conn)
174
175 b.conf = conf
176
177 // Create or reuse the global metrics shared between brokers
178 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", conf.MetricRegistry)
179 b.requestRate = metrics.GetOrRegisterMeter("request-rate", conf.MetricRegistry)
180 b.requestSize = getOrRegisterHistogram("request-size", conf.MetricRegistry)
181 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", conf.MetricRegistry)
182 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
183 b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
184 b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
185 // Do not gather metrics for seeded broker (only used during bootstrap) because they share
186 // the same id (-1) and are already exposed through the global metrics above
187 if b.id >= 0 {
Abhilash S.L3b494632019-07-16 15:51:09 +0530188 b.registerMetrics()
William Kurkianea869482019-04-09 15:16:11 -0400189 }
190
191 if conf.Net.SASL.Enable {
192
193 b.connErr = b.authenticateViaSASL()
194
195 if b.connErr != nil {
196 err = b.conn.Close()
197 if err == nil {
198 Logger.Printf("Closed connection to broker %s\n", b.addr)
199 } else {
200 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
201 }
202 b.conn = nil
203 atomic.StoreInt32(&b.opened, 0)
204 return
205 }
206 }
207
208 b.done = make(chan bool)
209 b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
210
211 if b.id >= 0 {
212 Logger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
213 } else {
214 Logger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
215 }
216 go withRecover(b.responseReceiver)
217 })
218
219 return nil
220}
221
222// Connected returns true if the broker is connected and false otherwise. If the broker is not
223// connected but it had tried to connect, the error from that connection attempt is also returned.
224func (b *Broker) Connected() (bool, error) {
225 b.lock.Lock()
226 defer b.lock.Unlock()
227
228 return b.conn != nil, b.connErr
229}
230
Abhilash S.L3b494632019-07-16 15:51:09 +0530231//Close closes the broker resources
William Kurkianea869482019-04-09 15:16:11 -0400232func (b *Broker) Close() error {
233 b.lock.Lock()
234 defer b.lock.Unlock()
235
236 if b.conn == nil {
237 return ErrNotConnected
238 }
239
240 close(b.responses)
241 <-b.done
242
243 err := b.conn.Close()
244
245 b.conn = nil
246 b.connErr = nil
247 b.done = nil
248 b.responses = nil
249
Abhilash S.L3b494632019-07-16 15:51:09 +0530250 b.unregisterMetrics()
William Kurkianea869482019-04-09 15:16:11 -0400251
252 if err == nil {
253 Logger.Printf("Closed connection to broker %s\n", b.addr)
254 } else {
255 Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
256 }
257
258 atomic.StoreInt32(&b.opened, 0)
259
260 return err
261}
262
263// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
264func (b *Broker) ID() int32 {
265 return b.id
266}
267
268// Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
269func (b *Broker) Addr() string {
270 return b.addr
271}
272
273// Rack returns the broker's rack as retrieved from Kafka's metadata or the
274// empty string if it is not known. The returned value corresponds to the
275// broker's broker.rack configuration setting. Requires protocol version to be
276// at least v0.10.0.0.
277func (b *Broker) Rack() string {
278 if b.rack == nil {
279 return ""
280 }
281 return *b.rack
282}
283
Abhilash S.L3b494632019-07-16 15:51:09 +0530284//GetMetadata send a metadata request and returns a metadata response or error
William Kurkianea869482019-04-09 15:16:11 -0400285func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
286 response := new(MetadataResponse)
287
288 err := b.sendAndReceive(request, response)
289
290 if err != nil {
291 return nil, err
292 }
293
294 return response, nil
295}
296
Abhilash S.L3b494632019-07-16 15:51:09 +0530297//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
William Kurkianea869482019-04-09 15:16:11 -0400298func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
299 response := new(ConsumerMetadataResponse)
300
301 err := b.sendAndReceive(request, response)
302
303 if err != nil {
304 return nil, err
305 }
306
307 return response, nil
308}
309
Abhilash S.L3b494632019-07-16 15:51:09 +0530310//FindCoordinator sends a find coordinate request and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400311func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
312 response := new(FindCoordinatorResponse)
313
314 err := b.sendAndReceive(request, response)
315
316 if err != nil {
317 return nil, err
318 }
319
320 return response, nil
321}
322
Abhilash S.L3b494632019-07-16 15:51:09 +0530323//GetAvailableOffsets return an offset response or error
William Kurkianea869482019-04-09 15:16:11 -0400324func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
325 response := new(OffsetResponse)
326
327 err := b.sendAndReceive(request, response)
328
329 if err != nil {
330 return nil, err
331 }
332
333 return response, nil
334}
335
Abhilash S.L3b494632019-07-16 15:51:09 +0530336//Produce returns a produce response or error
William Kurkianea869482019-04-09 15:16:11 -0400337func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
Abhilash S.L3b494632019-07-16 15:51:09 +0530338 var (
339 response *ProduceResponse
340 err error
341 )
William Kurkianea869482019-04-09 15:16:11 -0400342
343 if request.RequiredAcks == NoResponse {
344 err = b.sendAndReceive(request, nil)
345 } else {
346 response = new(ProduceResponse)
347 err = b.sendAndReceive(request, response)
348 }
349
350 if err != nil {
351 return nil, err
352 }
353
354 return response, nil
355}
356
Abhilash S.L3b494632019-07-16 15:51:09 +0530357//Fetch returns a FetchResponse or error
William Kurkianea869482019-04-09 15:16:11 -0400358func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
359 response := new(FetchResponse)
360
361 err := b.sendAndReceive(request, response)
William Kurkianea869482019-04-09 15:16:11 -0400362 if err != nil {
363 return nil, err
364 }
365
366 return response, nil
367}
368
Abhilash S.L3b494632019-07-16 15:51:09 +0530369//CommitOffset return an Offset commit reponse or error
William Kurkianea869482019-04-09 15:16:11 -0400370func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
371 response := new(OffsetCommitResponse)
372
373 err := b.sendAndReceive(request, response)
William Kurkianea869482019-04-09 15:16:11 -0400374 if err != nil {
375 return nil, err
376 }
377
378 return response, nil
379}
380
Abhilash S.L3b494632019-07-16 15:51:09 +0530381//FetchOffset returns an offset fetch response or error
William Kurkianea869482019-04-09 15:16:11 -0400382func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
383 response := new(OffsetFetchResponse)
384
385 err := b.sendAndReceive(request, response)
William Kurkianea869482019-04-09 15:16:11 -0400386 if err != nil {
387 return nil, err
388 }
389
390 return response, nil
391}
392
Abhilash S.L3b494632019-07-16 15:51:09 +0530393//JoinGroup returns a join group response or error
William Kurkianea869482019-04-09 15:16:11 -0400394func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
395 response := new(JoinGroupResponse)
396
397 err := b.sendAndReceive(request, response)
398 if err != nil {
399 return nil, err
400 }
401
402 return response, nil
403}
404
Abhilash S.L3b494632019-07-16 15:51:09 +0530405//SyncGroup returns a sync group response or error
William Kurkianea869482019-04-09 15:16:11 -0400406func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
407 response := new(SyncGroupResponse)
408
409 err := b.sendAndReceive(request, response)
410 if err != nil {
411 return nil, err
412 }
413
414 return response, nil
415}
416
Abhilash S.L3b494632019-07-16 15:51:09 +0530417//LeaveGroup return a leave group response or error
William Kurkianea869482019-04-09 15:16:11 -0400418func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
419 response := new(LeaveGroupResponse)
420
421 err := b.sendAndReceive(request, response)
422 if err != nil {
423 return nil, err
424 }
425
426 return response, nil
427}
428
Abhilash S.L3b494632019-07-16 15:51:09 +0530429//Heartbeat returns a heartbeat response or error
William Kurkianea869482019-04-09 15:16:11 -0400430func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
431 response := new(HeartbeatResponse)
432
433 err := b.sendAndReceive(request, response)
434 if err != nil {
435 return nil, err
436 }
437
438 return response, nil
439}
440
Abhilash S.L3b494632019-07-16 15:51:09 +0530441//ListGroups return a list group response or error
William Kurkianea869482019-04-09 15:16:11 -0400442func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
443 response := new(ListGroupsResponse)
444
445 err := b.sendAndReceive(request, response)
446 if err != nil {
447 return nil, err
448 }
449
450 return response, nil
451}
452
Abhilash S.L3b494632019-07-16 15:51:09 +0530453//DescribeGroups return describe group response or error
William Kurkianea869482019-04-09 15:16:11 -0400454func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
455 response := new(DescribeGroupsResponse)
456
457 err := b.sendAndReceive(request, response)
458 if err != nil {
459 return nil, err
460 }
461
462 return response, nil
463}
464
Abhilash S.L3b494632019-07-16 15:51:09 +0530465//ApiVersions return api version response or error
William Kurkianea869482019-04-09 15:16:11 -0400466func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
467 response := new(ApiVersionsResponse)
468
469 err := b.sendAndReceive(request, response)
470 if err != nil {
471 return nil, err
472 }
473
474 return response, nil
475}
476
Abhilash S.L3b494632019-07-16 15:51:09 +0530477//CreateTopics send a create topic request and returns create topic response
William Kurkianea869482019-04-09 15:16:11 -0400478func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
479 response := new(CreateTopicsResponse)
480
481 err := b.sendAndReceive(request, response)
482 if err != nil {
483 return nil, err
484 }
485
486 return response, nil
487}
488
Abhilash S.L3b494632019-07-16 15:51:09 +0530489//DeleteTopics sends a delete topic request and returns delete topic response
William Kurkianea869482019-04-09 15:16:11 -0400490func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
491 response := new(DeleteTopicsResponse)
492
493 err := b.sendAndReceive(request, response)
494 if err != nil {
495 return nil, err
496 }
497
498 return response, nil
499}
500
Abhilash S.L3b494632019-07-16 15:51:09 +0530501//CreatePartitions sends a create partition request and returns create
502//partitions response or error
William Kurkianea869482019-04-09 15:16:11 -0400503func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
504 response := new(CreatePartitionsResponse)
505
506 err := b.sendAndReceive(request, response)
507 if err != nil {
508 return nil, err
509 }
510
511 return response, nil
512}
513
Abhilash S.L3b494632019-07-16 15:51:09 +0530514//DeleteRecords send a request to delete records and return delete record
515//response or error
William Kurkianea869482019-04-09 15:16:11 -0400516func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
517 response := new(DeleteRecordsResponse)
518
519 err := b.sendAndReceive(request, response)
520 if err != nil {
521 return nil, err
522 }
523
524 return response, nil
525}
526
Abhilash S.L3b494632019-07-16 15:51:09 +0530527//DescribeAcls sends a describe acl request and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400528func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
529 response := new(DescribeAclsResponse)
530
531 err := b.sendAndReceive(request, response)
532 if err != nil {
533 return nil, err
534 }
535
536 return response, nil
537}
538
Abhilash S.L3b494632019-07-16 15:51:09 +0530539//CreateAcls sends a create acl request and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400540func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
541 response := new(CreateAclsResponse)
542
543 err := b.sendAndReceive(request, response)
544 if err != nil {
545 return nil, err
546 }
547
548 return response, nil
549}
550
Abhilash S.L3b494632019-07-16 15:51:09 +0530551//DeleteAcls sends a delete acl request and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400552func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
553 response := new(DeleteAclsResponse)
554
555 err := b.sendAndReceive(request, response)
556 if err != nil {
557 return nil, err
558 }
559
560 return response, nil
561}
562
Abhilash S.L3b494632019-07-16 15:51:09 +0530563//InitProducerID sends an init producer request and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400564func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
565 response := new(InitProducerIDResponse)
566
567 err := b.sendAndReceive(request, response)
568 if err != nil {
569 return nil, err
570 }
571
572 return response, nil
573}
574
Abhilash S.L3b494632019-07-16 15:51:09 +0530575//AddPartitionsToTxn send a request to add partition to txn and returns
576//a response or error
William Kurkianea869482019-04-09 15:16:11 -0400577func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
578 response := new(AddPartitionsToTxnResponse)
579
580 err := b.sendAndReceive(request, response)
581 if err != nil {
582 return nil, err
583 }
584
585 return response, nil
586}
587
Abhilash S.L3b494632019-07-16 15:51:09 +0530588//AddOffsetsToTxn sends a request to add offsets to txn and returns a response
589//or error
William Kurkianea869482019-04-09 15:16:11 -0400590func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
591 response := new(AddOffsetsToTxnResponse)
592
593 err := b.sendAndReceive(request, response)
594 if err != nil {
595 return nil, err
596 }
597
598 return response, nil
599}
600
Abhilash S.L3b494632019-07-16 15:51:09 +0530601//EndTxn sends a request to end txn and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400602func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
603 response := new(EndTxnResponse)
604
605 err := b.sendAndReceive(request, response)
606 if err != nil {
607 return nil, err
608 }
609
610 return response, nil
611}
612
Abhilash S.L3b494632019-07-16 15:51:09 +0530613//TxnOffsetCommit sends a request to commit transaction offsets and returns
614//a response or error
William Kurkianea869482019-04-09 15:16:11 -0400615func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
616 response := new(TxnOffsetCommitResponse)
617
618 err := b.sendAndReceive(request, response)
619 if err != nil {
620 return nil, err
621 }
622
623 return response, nil
624}
625
Abhilash S.L3b494632019-07-16 15:51:09 +0530626//DescribeConfigs sends a request to describe config and returns a response or
627//error
William Kurkianea869482019-04-09 15:16:11 -0400628func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
629 response := new(DescribeConfigsResponse)
630
631 err := b.sendAndReceive(request, response)
632 if err != nil {
633 return nil, err
634 }
635
636 return response, nil
637}
638
Abhilash S.L3b494632019-07-16 15:51:09 +0530639//AlterConfigs sends a request to alter config and return a response or error
William Kurkianea869482019-04-09 15:16:11 -0400640func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
641 response := new(AlterConfigsResponse)
642
643 err := b.sendAndReceive(request, response)
644 if err != nil {
645 return nil, err
646 }
647
648 return response, nil
649}
650
Abhilash S.L3b494632019-07-16 15:51:09 +0530651//DeleteGroups sends a request to delete groups and returns a response or error
William Kurkianea869482019-04-09 15:16:11 -0400652func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
653 response := new(DeleteGroupsResponse)
654
655 if err := b.sendAndReceive(request, response); err != nil {
656 return nil, err
657 }
658
659 return response, nil
660}
661
662func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
663 b.lock.Lock()
664 defer b.lock.Unlock()
665
666 if b.conn == nil {
667 if b.connErr != nil {
668 return nil, b.connErr
669 }
670 return nil, ErrNotConnected
671 }
672
673 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
674 return nil, ErrUnsupportedVersion
675 }
676
677 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
678 buf, err := encode(req, b.conf.MetricRegistry)
679 if err != nil {
680 return nil, err
681 }
682
683 err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
684 if err != nil {
685 return nil, err
686 }
687
688 requestTime := time.Now()
689 bytes, err := b.conn.Write(buf)
Abhilash S.L3b494632019-07-16 15:51:09 +0530690 b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
William Kurkianea869482019-04-09 15:16:11 -0400691 if err != nil {
692 return nil, err
693 }
694 b.correlationID++
695
696 if !promiseResponse {
697 // Record request latency without the response
698 b.updateRequestLatencyMetrics(time.Since(requestTime))
699 return nil, nil
700 }
701
702 promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
703 b.responses <- promise
704
705 return &promise, nil
706}
707
708func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
709 promise, err := b.send(req, res != nil)
William Kurkianea869482019-04-09 15:16:11 -0400710 if err != nil {
711 return err
712 }
713
714 if promise == nil {
715 return nil
716 }
717
718 select {
719 case buf := <-promise.packets:
720 return versionedDecode(buf, res, req.version())
721 case err = <-promise.errors:
722 return err
723 }
724}
725
726func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
727 b.id, err = pd.getInt32()
728 if err != nil {
729 return err
730 }
731
732 host, err := pd.getString()
733 if err != nil {
734 return err
735 }
736
737 port, err := pd.getInt32()
738 if err != nil {
739 return err
740 }
741
742 if version >= 1 {
743 b.rack, err = pd.getNullableString()
744 if err != nil {
745 return err
746 }
747 }
748
749 b.addr = net.JoinHostPort(host, fmt.Sprint(port))
750 if _, _, err := net.SplitHostPort(b.addr); err != nil {
751 return err
752 }
753
754 return nil
755}
756
757func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
William Kurkianea869482019-04-09 15:16:11 -0400758 host, portstr, err := net.SplitHostPort(b.addr)
759 if err != nil {
760 return err
761 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530762
William Kurkianea869482019-04-09 15:16:11 -0400763 port, err := strconv.Atoi(portstr)
764 if err != nil {
765 return err
766 }
767
768 pe.putInt32(b.id)
769
770 err = pe.putString(host)
771 if err != nil {
772 return err
773 }
774
775 pe.putInt32(int32(port))
776
777 if version >= 1 {
778 err = pe.putNullableString(b.rack)
779 if err != nil {
780 return err
781 }
782 }
783
784 return nil
785}
786
787func (b *Broker) responseReceiver() {
788 var dead error
789 header := make([]byte, 8)
Abhilash S.L3b494632019-07-16 15:51:09 +0530790
William Kurkianea869482019-04-09 15:16:11 -0400791 for response := range b.responses {
792 if dead != nil {
793 response.errors <- dead
794 continue
795 }
796
797 err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
798 if err != nil {
799 dead = err
800 response.errors <- err
801 continue
802 }
803
804 bytesReadHeader, err := io.ReadFull(b.conn, header)
805 requestLatency := time.Since(response.requestTime)
806 if err != nil {
807 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
808 dead = err
809 response.errors <- err
810 continue
811 }
812
813 decodedHeader := responseHeader{}
814 err = decode(header, &decodedHeader)
815 if err != nil {
816 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
817 dead = err
818 response.errors <- err
819 continue
820 }
821 if decodedHeader.correlationID != response.correlationID {
822 b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
823 // TODO if decoded ID < cur ID, discard until we catch up
824 // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
825 dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
826 response.errors <- dead
827 continue
828 }
829
830 buf := make([]byte, decodedHeader.length-4)
831 bytesReadBody, err := io.ReadFull(b.conn, buf)
832 b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
833 if err != nil {
834 dead = err
835 response.errors <- err
836 continue
837 }
838
839 response.packets <- buf
840 }
841 close(b.done)
842}
843
844func (b *Broker) authenticateViaSASL() error {
Abhilash S.L3b494632019-07-16 15:51:09 +0530845 switch b.conf.Net.SASL.Mechanism {
846 case SASLTypeOAuth:
William Kurkianea869482019-04-09 15:16:11 -0400847 return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
Abhilash S.L3b494632019-07-16 15:51:09 +0530848 case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
849 return b.sendAndReceiveSASLSCRAMv1()
850 case SASLTypeGSSAPI:
851 return b.sendAndReceiveKerberos()
852 default:
853 return b.sendAndReceiveSASLPlainAuth()
William Kurkianea869482019-04-09 15:16:11 -0400854 }
William Kurkianea869482019-04-09 15:16:11 -0400855}
856
Abhilash S.L3b494632019-07-16 15:51:09 +0530857func (b *Broker) sendAndReceiveKerberos() error {
858 b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
859 if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
860 b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
861 }
862 return b.kerberosAuthenticator.Authorize(b)
863}
864
865func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
866 rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
William Kurkianea869482019-04-09 15:16:11 -0400867
868 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
869 buf, err := encode(req, b.conf.MetricRegistry)
870 if err != nil {
871 return err
872 }
873
874 err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
875 if err != nil {
876 return err
877 }
878
879 requestTime := time.Now()
880 bytes, err := b.conn.Write(buf)
881 b.updateOutgoingCommunicationMetrics(bytes)
882 if err != nil {
883 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
884 return err
885 }
886 b.correlationID++
887 //wait for the response
888 header := make([]byte, 8) // response header
889 _, err = io.ReadFull(b.conn, header)
890 if err != nil {
891 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
892 return err
893 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530894
William Kurkianea869482019-04-09 15:16:11 -0400895 length := binary.BigEndian.Uint32(header[:4])
896 payload := make([]byte, length-4)
897 n, err := io.ReadFull(b.conn, payload)
898 if err != nil {
899 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
900 return err
901 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530902
William Kurkianea869482019-04-09 15:16:11 -0400903 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
904 res := &SaslHandshakeResponse{}
Abhilash S.L3b494632019-07-16 15:51:09 +0530905
William Kurkianea869482019-04-09 15:16:11 -0400906 err = versionedDecode(payload, res, 0)
907 if err != nil {
908 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
909 return err
910 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530911
William Kurkianea869482019-04-09 15:16:11 -0400912 if res.Err != ErrNoError {
913 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
914 return res.Err
915 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530916
917 Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
William Kurkianea869482019-04-09 15:16:11 -0400918 return nil
919}
920
Abhilash S.L3b494632019-07-16 15:51:09 +0530921// Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
922// Kafka 1.x.x onward added a SaslAuthenticate request/response message which
923// wraps the SASL flow in the Kafka protocol, which allows for returning
924// meaningful errors on authentication failure.
William Kurkianea869482019-04-09 15:16:11 -0400925//
926// In SASL Plain, Kafka expects the auth header to be in the following format
927// Message format (from https://tools.ietf.org/html/rfc4616):
928//
929// message = [authzid] UTF8NUL authcid UTF8NUL passwd
930// authcid = 1*SAFE ; MUST accept up to 255 octets
931// authzid = 1*SAFE ; MUST accept up to 255 octets
932// passwd = 1*SAFE ; MUST accept up to 255 octets
933// UTF8NUL = %x00 ; UTF-8 encoded NUL character
934//
935// SAFE = UTF1 / UTF2 / UTF3 / UTF4
936// ;; any UTF-8 encoded Unicode character except NUL
937//
Abhilash S.L3b494632019-07-16 15:51:09 +0530938// With SASL v0 handshake and auth then:
William Kurkianea869482019-04-09 15:16:11 -0400939// When credentials are valid, Kafka returns a 4 byte array of null characters.
Abhilash S.L3b494632019-07-16 15:51:09 +0530940// When credentials are invalid, Kafka closes the connection.
941//
942// With SASL v1 handshake and auth then:
943// When credentials are invalid, Kafka replies with a SaslAuthenticate response
944// containing an error code and message detailing the authentication failure.
William Kurkianea869482019-04-09 15:16:11 -0400945func (b *Broker) sendAndReceiveSASLPlainAuth() error {
Abhilash S.L3b494632019-07-16 15:51:09 +0530946 // default to V0 to allow for backward compatability when SASL is enabled
947 // but not the handshake
William Kurkianea869482019-04-09 15:16:11 -0400948 if b.conf.Net.SASL.Handshake {
Abhilash S.L3b494632019-07-16 15:51:09 +0530949
950 handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
William Kurkianea869482019-04-09 15:16:11 -0400951 if handshakeErr != nil {
952 Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
953 return handshakeErr
954 }
955 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530956
957 if b.conf.Net.SASL.Version == SASLHandshakeV1 {
958 return b.sendAndReceiveV1SASLPlainAuth()
959 }
960 return b.sendAndReceiveV0SASLPlainAuth()
961}
962
963// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
964func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
965
William Kurkianea869482019-04-09 15:16:11 -0400966 length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
967 authBytes := make([]byte, length+4) //4 byte length header + auth data
968 binary.BigEndian.PutUint32(authBytes, uint32(length))
969 copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
970
971 err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
972 if err != nil {
973 Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
974 return err
975 }
976
977 requestTime := time.Now()
978 bytesWritten, err := b.conn.Write(authBytes)
979 b.updateOutgoingCommunicationMetrics(bytesWritten)
980 if err != nil {
981 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
982 return err
983 }
984
985 header := make([]byte, 4)
986 n, err := io.ReadFull(b.conn, header)
987 b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
988 // If the credentials are valid, we would get a 4 byte response filled with null characters.
989 // Otherwise, the broker closes the connection and we get an EOF
990 if err != nil {
991 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
992 return err
993 }
994
995 Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
996 return nil
997}
998
Abhilash S.L3b494632019-07-16 15:51:09 +0530999// sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
1000func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
1001 correlationID := b.correlationID
1002
1003 requestTime := time.Now()
1004
1005 bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
1006
1007 b.updateOutgoingCommunicationMetrics(bytesWritten)
1008
1009 if err != nil {
1010 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1011 return err
1012 }
1013
1014 b.correlationID++
1015
1016 bytesRead, err := b.receiveSASLServerResponse(correlationID)
1017 b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
1018
1019 // With v1 sasl we get an error message set in the response we can return
1020 if err != nil {
1021 Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
1022 return err
1023 }
1024
1025 return nil
1026}
1027
William Kurkianea869482019-04-09 15:16:11 -04001028// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
1029// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
1030func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
William Kurkianea869482019-04-09 15:16:11 -04001031 if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
1032 return err
1033 }
1034
1035 token, err := provider.Token()
William Kurkianea869482019-04-09 15:16:11 -04001036 if err != nil {
1037 return err
1038 }
1039
1040 requestTime := time.Now()
William Kurkianea869482019-04-09 15:16:11 -04001041 correlationID := b.correlationID
1042
1043 bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
William Kurkianea869482019-04-09 15:16:11 -04001044 if err != nil {
1045 return err
1046 }
1047
1048 b.updateOutgoingCommunicationMetrics(bytesWritten)
William Kurkianea869482019-04-09 15:16:11 -04001049 b.correlationID++
1050
Abhilash S.L3b494632019-07-16 15:51:09 +05301051 bytesRead, err := b.receiveSASLServerResponse(correlationID)
William Kurkianea869482019-04-09 15:16:11 -04001052 if err != nil {
1053 return err
1054 }
1055
1056 requestLatency := time.Since(requestTime)
1057 b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
1058
1059 return nil
1060}
1061
Abhilash S.L3b494632019-07-16 15:51:09 +05301062func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
1063 if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
1064 return err
1065 }
1066
1067 scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
1068 if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
1069 return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
1070 }
1071
1072 msg, err := scramClient.Step("")
1073 if err != nil {
1074 return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
1075
1076 }
1077
1078 for !scramClient.Done() {
1079 requestTime := time.Now()
1080 correlationID := b.correlationID
1081 bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
1082 if err != nil {
1083 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1084 return err
1085 }
1086
1087 b.updateOutgoingCommunicationMetrics(bytesWritten)
1088 b.correlationID++
1089 challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
1090 if err != nil {
1091 Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1092 return err
1093 }
1094
1095 b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
1096 msg, err = scramClient.Step(string(challenge))
1097 if err != nil {
1098 Logger.Println("SASL authentication failed", err)
1099 return err
1100 }
1101 }
1102
1103 Logger.Println("SASL authentication succeeded")
1104 return nil
1105}
1106
1107func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
1108 rb := &SaslAuthenticateRequest{msg}
1109 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1110 buf, err := encode(req, b.conf.MetricRegistry)
1111 if err != nil {
1112 return 0, err
1113 }
1114
1115 if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
1116 return 0, err
1117 }
1118
1119 return b.conn.Write(buf)
1120}
1121
1122func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
1123 buf := make([]byte, responseLengthSize+correlationIDSize)
1124 _, err := io.ReadFull(b.conn, buf)
1125 if err != nil {
1126 return nil, err
1127 }
1128
1129 header := responseHeader{}
1130 err = decode(buf, &header)
1131 if err != nil {
1132 return nil, err
1133 }
1134
1135 if header.correlationID != correlationID {
1136 return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1137 }
1138
1139 buf = make([]byte, header.length-correlationIDSize)
1140 _, err = io.ReadFull(b.conn, buf)
1141 if err != nil {
1142 return nil, err
1143 }
1144
1145 res := &SaslAuthenticateResponse{}
1146 if err := versionedDecode(buf, res, 0); err != nil {
1147 return nil, err
1148 }
1149 if res.Err != ErrNoError {
1150 return nil, res.Err
1151 }
1152 return res.SaslAuthBytes, nil
1153}
1154
William Kurkianea869482019-04-09 15:16:11 -04001155// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
1156// https://tools.ietf.org/html/rfc7628
1157func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
William Kurkianea869482019-04-09 15:16:11 -04001158 var ext string
1159
1160 if token.Extensions != nil && len(token.Extensions) > 0 {
1161 if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
Abhilash S.L3b494632019-07-16 15:51:09 +05301162 return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
William Kurkianea869482019-04-09 15:16:11 -04001163 }
1164 ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
1165 }
1166
1167 resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
1168
1169 return resp, nil
1170}
1171
1172// mapToString returns a list of key-value pairs ordered by key.
1173// keyValSep separates the key from the value. elemSep separates each pair.
1174func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
William Kurkianea869482019-04-09 15:16:11 -04001175 buf := make([]string, 0, len(extensions))
1176
1177 for k, v := range extensions {
1178 buf = append(buf, k+keyValSep+v)
1179 }
1180
1181 sort.Strings(buf)
1182
1183 return strings.Join(buf, elemSep)
1184}
1185
Abhilash S.L3b494632019-07-16 15:51:09 +05301186func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
1187 authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
1188 rb := &SaslAuthenticateRequest{authBytes}
1189 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1190 buf, err := encode(req, b.conf.MetricRegistry)
1191 if err != nil {
1192 return 0, err
1193 }
1194
1195 err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
1196 if err != nil {
1197 Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
1198 return 0, err
1199 }
1200 return b.conn.Write(buf)
1201}
1202
William Kurkianea869482019-04-09 15:16:11 -04001203func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
William Kurkianea869482019-04-09 15:16:11 -04001204 initialResp, err := buildClientInitialResponse(token)
William Kurkianea869482019-04-09 15:16:11 -04001205 if err != nil {
1206 return 0, err
1207 }
1208
1209 rb := &SaslAuthenticateRequest{initialResp}
1210
1211 req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
1212
1213 buf, err := encode(req, b.conf.MetricRegistry)
William Kurkianea869482019-04-09 15:16:11 -04001214 if err != nil {
1215 return 0, err
1216 }
1217
1218 if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
1219 return 0, err
1220 }
1221
1222 return b.conn.Write(buf)
1223}
1224
Abhilash S.L3b494632019-07-16 15:51:09 +05301225func (b *Broker) receiveSASLServerResponse(correlationID int32) (int, error) {
William Kurkianea869482019-04-09 15:16:11 -04001226
Abhilash S.L3b494632019-07-16 15:51:09 +05301227 buf := make([]byte, responseLengthSize+correlationIDSize)
William Kurkianea869482019-04-09 15:16:11 -04001228
1229 bytesRead, err := io.ReadFull(b.conn, buf)
William Kurkianea869482019-04-09 15:16:11 -04001230 if err != nil {
1231 return bytesRead, err
1232 }
1233
1234 header := responseHeader{}
1235
1236 err = decode(buf, &header)
William Kurkianea869482019-04-09 15:16:11 -04001237 if err != nil {
1238 return bytesRead, err
1239 }
1240
1241 if header.correlationID != correlationID {
1242 return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
1243 }
1244
Abhilash S.L3b494632019-07-16 15:51:09 +05301245 buf = make([]byte, header.length-correlationIDSize)
William Kurkianea869482019-04-09 15:16:11 -04001246
1247 c, err := io.ReadFull(b.conn, buf)
William Kurkianea869482019-04-09 15:16:11 -04001248 bytesRead += c
William Kurkianea869482019-04-09 15:16:11 -04001249 if err != nil {
1250 return bytesRead, err
1251 }
1252
1253 res := &SaslAuthenticateResponse{}
1254
1255 if err := versionedDecode(buf, res, 0); err != nil {
1256 return bytesRead, err
1257 }
1258
William Kurkianea869482019-04-09 15:16:11 -04001259 if res.Err != ErrNoError {
1260 return bytesRead, res.Err
1261 }
1262
1263 if len(res.SaslAuthBytes) > 0 {
1264 Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
1265 }
1266
1267 return bytesRead, nil
1268}
1269
1270func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
1271 b.updateRequestLatencyMetrics(requestLatency)
1272 b.responseRate.Mark(1)
Abhilash S.L3b494632019-07-16 15:51:09 +05301273
William Kurkianea869482019-04-09 15:16:11 -04001274 if b.brokerResponseRate != nil {
1275 b.brokerResponseRate.Mark(1)
1276 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301277
William Kurkianea869482019-04-09 15:16:11 -04001278 responseSize := int64(bytes)
1279 b.incomingByteRate.Mark(responseSize)
1280 if b.brokerIncomingByteRate != nil {
1281 b.brokerIncomingByteRate.Mark(responseSize)
1282 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301283
William Kurkianea869482019-04-09 15:16:11 -04001284 b.responseSize.Update(responseSize)
1285 if b.brokerResponseSize != nil {
1286 b.brokerResponseSize.Update(responseSize)
1287 }
1288}
1289
1290func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
1291 requestLatencyInMs := int64(requestLatency / time.Millisecond)
1292 b.requestLatency.Update(requestLatencyInMs)
Abhilash S.L3b494632019-07-16 15:51:09 +05301293
William Kurkianea869482019-04-09 15:16:11 -04001294 if b.brokerRequestLatency != nil {
1295 b.brokerRequestLatency.Update(requestLatencyInMs)
1296 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301297
William Kurkianea869482019-04-09 15:16:11 -04001298}
1299
1300func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
1301 b.requestRate.Mark(1)
1302 if b.brokerRequestRate != nil {
1303 b.brokerRequestRate.Mark(1)
1304 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301305
William Kurkianea869482019-04-09 15:16:11 -04001306 requestSize := int64(bytes)
1307 b.outgoingByteRate.Mark(requestSize)
1308 if b.brokerOutgoingByteRate != nil {
1309 b.brokerOutgoingByteRate.Mark(requestSize)
1310 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301311
William Kurkianea869482019-04-09 15:16:11 -04001312 b.requestSize.Update(requestSize)
1313 if b.brokerRequestSize != nil {
1314 b.brokerRequestSize.Update(requestSize)
1315 }
Abhilash S.L3b494632019-07-16 15:51:09 +05301316
1317}
1318
1319func (b *Broker) registerMetrics() {
1320 b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
1321 b.brokerRequestRate = b.registerMeter("request-rate")
1322 b.brokerRequestSize = b.registerHistogram("request-size")
1323 b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
1324 b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
1325 b.brokerResponseRate = b.registerMeter("response-rate")
1326 b.brokerResponseSize = b.registerHistogram("response-size")
1327}
1328
1329func (b *Broker) unregisterMetrics() {
1330 for _, name := range b.registeredMetrics {
1331 b.conf.MetricRegistry.Unregister(name)
1332 }
1333}
1334
1335func (b *Broker) registerMeter(name string) metrics.Meter {
1336 nameForBroker := getMetricNameForBroker(name, b)
1337 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1338 return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
1339}
1340
1341func (b *Broker) registerHistogram(name string) metrics.Histogram {
1342 nameForBroker := getMetricNameForBroker(name, b)
1343 b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
1344 return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
William Kurkianea869482019-04-09 15:16:11 -04001345}