blob: ab8b70196f8239cb42ec9b8b114ba20b7863d977 [file] [log] [blame]
Scott Baker8461e152019-10-01 14:44:30 -07001package sarama
2
3import (
4 "encoding/binary"
khenaidood948f772021-08-11 17:49:24 -04005 "errors"
Scott Baker8461e152019-10-01 14:44:30 -07006 "fmt"
Scott Baker8461e152019-10-01 14:44:30 -07007 "io"
khenaidood948f772021-08-11 17:49:24 -04008 "math"
Scott Baker8461e152019-10-01 14:44:30 -07009 "strings"
10 "time"
khenaidood948f772021-08-11 17:49:24 -040011
12 "github.com/jcmturner/gofork/encoding/asn1"
13 "github.com/jcmturner/gokrb5/v8/asn1tools"
14 "github.com/jcmturner/gokrb5/v8/gssapi"
15 "github.com/jcmturner/gokrb5/v8/iana/chksumtype"
16 "github.com/jcmturner/gokrb5/v8/iana/keyusage"
17 "github.com/jcmturner/gokrb5/v8/messages"
18 "github.com/jcmturner/gokrb5/v8/types"
Scott Baker8461e152019-10-01 14:44:30 -070019)
20
21const (
22 TOK_ID_KRB_AP_REQ = 256
23 GSS_API_GENERIC_TAG = 0x60
24 KRB5_USER_AUTH = 1
25 KRB5_KEYTAB_AUTH = 2
26 GSS_API_INITIAL = 1
27 GSS_API_VERIFY = 2
28 GSS_API_FINISH = 3
29)
30
31type GSSAPIConfig struct {
32 AuthType int
33 KeyTabPath string
34 KerberosConfigPath string
35 ServiceName string
36 Username string
37 Password string
38 Realm string
khenaidood948f772021-08-11 17:49:24 -040039 DisablePAFXFAST bool
Scott Baker8461e152019-10-01 14:44:30 -070040}
41
42type GSSAPIKerberosAuth struct {
43 Config *GSSAPIConfig
44 ticket messages.Ticket
45 encKey types.EncryptionKey
46 NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
47 step int
48}
49
50type KerberosClient interface {
51 Login() error
52 GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
53 Domain() string
54 CName() types.PrincipalName
55 Destroy()
56}
57
khenaidood948f772021-08-11 17:49:24 -040058// writePackage appends length in big endian before the payload, and sends it to kafka
Scott Baker8461e152019-10-01 14:44:30 -070059func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
khenaidood948f772021-08-11 17:49:24 -040060 length := uint64(len(payload))
61 size := length + 4 // 4 byte length header + payload
62 if size > math.MaxInt32 {
63 return 0, errors.New("payload too large, will overflow int32")
64 }
65 finalPackage := make([]byte, size)
Scott Baker8461e152019-10-01 14:44:30 -070066 copy(finalPackage[4:], payload)
67 binary.BigEndian.PutUint32(finalPackage, uint32(length))
68 bytes, err := broker.conn.Write(finalPackage)
69 if err != nil {
70 return bytes, err
71 }
72 return bytes, nil
73}
74
khenaidood948f772021-08-11 17:49:24 -040075// readPackage reads payload length (4 bytes) and then reads the payload into []byte
Scott Baker8461e152019-10-01 14:44:30 -070076func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
77 bytesRead := 0
78 lengthInBytes := make([]byte, 4)
79 bytes, err := io.ReadFull(broker.conn, lengthInBytes)
80 if err != nil {
81 return nil, bytesRead, err
82 }
83 bytesRead += bytes
84 payloadLength := binary.BigEndian.Uint32(lengthInBytes)
85 payloadBytes := make([]byte, payloadLength) // buffer for read..
86 bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
87 if err != nil {
88 return payloadBytes, bytesRead, err
89 }
90 bytesRead += bytes
91 return payloadBytes, bytesRead, nil
92}
93
94func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
95 a := make([]byte, 24)
96 flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
97 binary.LittleEndian.PutUint32(a[:4], 16)
98 for _, i := range flags {
99 f := binary.LittleEndian.Uint32(a[20:24])
100 f |= uint32(i)
101 binary.LittleEndian.PutUint32(a[20:24], f)
102 }
103 return a
104}
105
106/*
107*
108* Construct Kerberos AP_REQ package, conforming to RFC-4120
109* https://tools.ietf.org/html/rfc4120#page-84
110*
111 */
112func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
113 domain string, cname types.PrincipalName,
114 ticket messages.Ticket,
115 sessionKey types.EncryptionKey) ([]byte, error) {
116 auth, err := types.NewAuthenticator(domain, cname)
117 if err != nil {
118 return nil, err
119 }
120 auth.Cksum = types.Checksum{
121 CksumType: chksumtype.GSSAPI,
122 Checksum: krbAuth.newAuthenticatorChecksum(),
123 }
124 APReq, err := messages.NewAPReq(
125 ticket,
126 sessionKey,
127 auth,
128 )
129 if err != nil {
130 return nil, err
131 }
132 aprBytes := make([]byte, 2)
133 binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
134 tb, err := APReq.Marshal()
135 if err != nil {
136 return nil, err
137 }
138 aprBytes = append(aprBytes, tb...)
139 return aprBytes, nil
140}
141
142/*
143*
144* Append the GSS-API header to the payload, conforming to RFC-2743
145* Section 3.1, Mechanism-Independent Token Format
146*
147* https://tools.ietf.org/html/rfc2743#page-81
148*
149* GSSAPIHeader + <specific mechanism payload>
150*
151 */
152func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
khenaidood948f772021-08-11 17:49:24 -0400153 oidBytes, err := asn1.Marshal(gssapi.OIDKRB5.OID())
Scott Baker8461e152019-10-01 14:44:30 -0700154 if err != nil {
155 return nil, err
156 }
157 tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
158 GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
159 GSSHeader = append(GSSHeader, oidBytes...)
160 GSSPackage := append(GSSHeader, payload...)
161 return GSSPackage, nil
162}
163
164func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient KerberosClient) ([]byte, error) {
165 switch krbAuth.step {
166 case GSS_API_INITIAL:
167 aprBytes, err := krbAuth.createKrb5Token(
168 kerberosClient.Domain(),
169 kerberosClient.CName(),
170 krbAuth.ticket,
171 krbAuth.encKey)
172 if err != nil {
173 return nil, err
174 }
175 krbAuth.step = GSS_API_VERIFY
176 return krbAuth.appendGSSAPIHeader(aprBytes)
177 case GSS_API_VERIFY:
178 wrapTokenReq := gssapi.WrapToken{}
179 if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
180 return nil, err
181 }
182 // Validate response.
183 isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
184 if !isValid {
185 return nil, err
186 }
187
188 wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
189 if err != nil {
190 return nil, err
191 }
192 krbAuth.step = GSS_API_FINISH
193 return wrapTokenResponse.Marshal()
194 }
195 return nil, nil
196}
197
198/* This does the handshake for authorization */
199func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
Scott Baker8461e152019-10-01 14:44:30 -0700200 kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
201 if err != nil {
202 Logger.Printf("Kerberos client error: %s", err)
203 return err
204 }
205
206 err = kerberosClient.Login()
207 if err != nil {
208 Logger.Printf("Kerberos client error: %s", err)
209 return err
210 }
211 // Construct SPN using serviceName and host
212 // SPN format: <SERVICE>/<FQDN>
213
214 host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
215 spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
216
217 ticket, encKey, err := kerberosClient.GetServiceTicket(spn)
Scott Baker8461e152019-10-01 14:44:30 -0700218 if err != nil {
219 Logger.Printf("Error getting Kerberos service ticket : %s", err)
220 return err
221 }
222 krbAuth.ticket = ticket
223 krbAuth.encKey = encKey
224 krbAuth.step = GSS_API_INITIAL
225 var receivedBytes []byte = nil
226 defer kerberosClient.Destroy()
227 for {
228 packBytes, err := krbAuth.initSecContext(receivedBytes, kerberosClient)
229 if err != nil {
230 Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
231 return err
232 }
233 requestTime := time.Now()
234 bytesWritten, err := krbAuth.writePackage(broker, packBytes)
235 if err != nil {
236 Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
237 return err
238 }
239 broker.updateOutgoingCommunicationMetrics(bytesWritten)
240 if krbAuth.step == GSS_API_VERIFY {
khenaidood948f772021-08-11 17:49:24 -0400241 bytesRead := 0
Scott Baker8461e152019-10-01 14:44:30 -0700242 receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
243 requestLatency := time.Since(requestTime)
244 broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
245 if err != nil {
246 Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
247 return err
248 }
249 } else if krbAuth.step == GSS_API_FINISH {
250 return nil
251 }
252 }
253}