blob: bd848a7137e35495454e4f4dca9dc4040711ae64 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package embed
16
17import (
18 "context"
19 "crypto/tls"
20 "fmt"
21 "io/ioutil"
22 defaultLog "log"
23 "net"
24 "net/http"
25 "net/url"
26 "strconv"
27 "sync"
28 "time"
29
30 "github.com/coreos/etcd/compactor"
31 "github.com/coreos/etcd/etcdserver"
32 "github.com/coreos/etcd/etcdserver/api/etcdhttp"
33 "github.com/coreos/etcd/etcdserver/api/v2http"
34 "github.com/coreos/etcd/etcdserver/api/v2v3"
35 "github.com/coreos/etcd/etcdserver/api/v3client"
36 "github.com/coreos/etcd/etcdserver/api/v3rpc"
37 "github.com/coreos/etcd/pkg/cors"
38 "github.com/coreos/etcd/pkg/debugutil"
39 runtimeutil "github.com/coreos/etcd/pkg/runtime"
40 "github.com/coreos/etcd/pkg/transport"
41 "github.com/coreos/etcd/pkg/types"
42 "github.com/coreos/etcd/rafthttp"
43
44 "github.com/coreos/pkg/capnslog"
45 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
46 "github.com/soheilhy/cmux"
47 "google.golang.org/grpc"
48 "google.golang.org/grpc/keepalive"
49)
50
51var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
52
53const (
54 // internal fd usage includes disk usage and transport usage.
55 // To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
56 // at most 2 to read/lock/write WALs. One case that it needs to 2 is to
57 // read all logs after some snapshot index, which locates at the end of
58 // the second last and the head of the last. For purging, it needs to read
59 // directory, so it needs 1. For fd monitor, it needs 1.
60 // For transport, rafthttp builds two long-polling connections and at most
61 // four temporary connections with each member. There are at most 9 members
62 // in a cluster, so it should reserve 96.
63 // For the safety, we set the total reserved number to 150.
64 reservedInternalFDNum = 150
65)
66
67// Etcd contains a running etcd server and its listeners.
68type Etcd struct {
69 Peers []*peerListener
70 Clients []net.Listener
71 // a map of contexts for the servers that serves client requests.
72 sctxs map[string]*serveCtx
73 metricsListeners []net.Listener
74
75 Server *etcdserver.EtcdServer
76
77 cfg Config
78 stopc chan struct{}
79 errc chan error
80
81 closeOnce sync.Once
82}
83
84type peerListener struct {
85 net.Listener
86 serve func() error
87 close func(context.Context) error
88}
89
90// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
91// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
92// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
93func StartEtcd(inCfg *Config) (e *Etcd, err error) {
94 if err = inCfg.Validate(); err != nil {
95 return nil, err
96 }
97 serving := false
98 e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
99 cfg := &e.cfg
100 defer func() {
101 if e == nil || err == nil {
102 return
103 }
104 if !serving {
105 // errored before starting gRPC server for serveCtx.serversC
106 for _, sctx := range e.sctxs {
107 close(sctx.serversC)
108 }
109 }
110 e.Close()
111 e = nil
112 }()
113
114 if e.Peers, err = startPeerListeners(cfg); err != nil {
115 return e, err
116 }
117 if e.sctxs, err = startClientListeners(cfg); err != nil {
118 return e, err
119 }
120 for _, sctx := range e.sctxs {
121 e.Clients = append(e.Clients, sctx.l)
122 }
123
124 var (
125 urlsmap types.URLsMap
126 token string
127 )
128
129 memberInitialized := true
130 if !isMemberInitialized(cfg) {
131 memberInitialized = false
132 urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
133 if err != nil {
134 return e, fmt.Errorf("error setting up initial cluster: %v", err)
135 }
136 }
137
138 // AutoCompactionRetention defaults to "0" if not set.
139 if len(cfg.AutoCompactionRetention) == 0 {
140 cfg.AutoCompactionRetention = "0"
141 }
142 autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
143 if err != nil {
144 return e, err
145 }
146
147 srvcfg := etcdserver.ServerConfig{
148 Name: cfg.Name,
149 ClientURLs: cfg.ACUrls,
150 PeerURLs: cfg.APUrls,
151 DataDir: cfg.Dir,
152 DedicatedWALDir: cfg.WalDir,
153 SnapCount: cfg.SnapCount,
154 MaxSnapFiles: cfg.MaxSnapFiles,
155 MaxWALFiles: cfg.MaxWalFiles,
156 InitialPeerURLsMap: urlsmap,
157 InitialClusterToken: token,
158 DiscoveryURL: cfg.Durl,
159 DiscoveryProxy: cfg.Dproxy,
160 NewCluster: cfg.IsNewCluster(),
161 ForceNewCluster: cfg.ForceNewCluster,
162 PeerTLSInfo: cfg.PeerTLSInfo,
163 TickMs: cfg.TickMs,
164 ElectionTicks: cfg.ElectionTicks(),
165 InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
166 AutoCompactionRetention: autoCompactionRetention,
167 AutoCompactionMode: cfg.AutoCompactionMode,
168 QuotaBackendBytes: cfg.QuotaBackendBytes,
169 MaxTxnOps: cfg.MaxTxnOps,
170 MaxRequestBytes: cfg.MaxRequestBytes,
171 StrictReconfigCheck: cfg.StrictReconfigCheck,
172 ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
173 AuthToken: cfg.AuthToken,
174 InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
175 CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
176 Debug: cfg.Debug,
177 }
178
179 if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
180 return e, err
181 }
182
183 // buffer channel so goroutines on closed connections won't wait forever
184 e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
185
186 // newly started member ("memberInitialized==false")
187 // does not need corruption check
188 if memberInitialized {
189 if err = e.Server.CheckInitialHashKV(); err != nil {
190 // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
191 // (nothing to close since rafthttp transports have not been started)
192 e.Server = nil
193 return e, err
194 }
195 }
196 e.Server.Start()
197
198 if err = e.servePeers(); err != nil {
199 return e, err
200 }
201 if err = e.serveClients(); err != nil {
202 return e, err
203 }
204 if err = e.serveMetrics(); err != nil {
205 return e, err
206 }
207
208 serving = true
209 return e, nil
210}
211
212// Config returns the current configuration.
213func (e *Etcd) Config() Config {
214 return e.cfg
215}
216
217// Close gracefully shuts down all servers/listeners.
218// Client requests will be terminated with request timeout.
219// After timeout, enforce remaning requests be closed immediately.
220func (e *Etcd) Close() {
221 e.closeOnce.Do(func() { close(e.stopc) })
222
223 // close client requests with request timeout
224 timeout := 2 * time.Second
225 if e.Server != nil {
226 timeout = e.Server.Cfg.ReqTimeout()
227 }
228 for _, sctx := range e.sctxs {
229 for ss := range sctx.serversC {
230 ctx, cancel := context.WithTimeout(context.Background(), timeout)
231 stopServers(ctx, ss)
232 cancel()
233 }
234 }
235
236 for _, sctx := range e.sctxs {
237 sctx.cancel()
238 }
239
240 for i := range e.Clients {
241 if e.Clients[i] != nil {
242 e.Clients[i].Close()
243 }
244 }
245
246 for i := range e.metricsListeners {
247 e.metricsListeners[i].Close()
248 }
249
250 // close rafthttp transports
251 if e.Server != nil {
252 e.Server.Stop()
253 }
254
255 // close all idle connections in peer handler (wait up to 1-second)
256 for i := range e.Peers {
257 if e.Peers[i] != nil && e.Peers[i].close != nil {
258 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
259 e.Peers[i].close(ctx)
260 cancel()
261 }
262 }
263}
264
265func stopServers(ctx context.Context, ss *servers) {
266 shutdownNow := func() {
267 // first, close the http.Server
268 ss.http.Shutdown(ctx)
269 // then close grpc.Server; cancels all active RPCs
270 ss.grpc.Stop()
271 }
272
273 // do not grpc.Server.GracefulStop with TLS enabled etcd server
274 // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
275 // and https://github.com/coreos/etcd/issues/8916
276 if ss.secure {
277 shutdownNow()
278 return
279 }
280
281 ch := make(chan struct{})
282 go func() {
283 defer close(ch)
284 // close listeners to stop accepting new connections,
285 // will block on any existing transports
286 ss.grpc.GracefulStop()
287 }()
288
289 // wait until all pending RPCs are finished
290 select {
291 case <-ch:
292 case <-ctx.Done():
293 // took too long, manually close open transports
294 // e.g. watch streams
295 shutdownNow()
296
297 // concurrent GracefulStop should be interrupted
298 <-ch
299 }
300}
301
302func (e *Etcd) Err() <-chan error { return e.errc }
303
304func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
305 if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
306 return nil, err
307 }
308 if err = cfg.PeerSelfCert(); err != nil {
309 plog.Fatalf("could not get certs (%v)", err)
310 }
311 if !cfg.PeerTLSInfo.Empty() {
312 plog.Infof("peerTLS: %s", cfg.PeerTLSInfo)
313 }
314
315 peers = make([]*peerListener, len(cfg.LPUrls))
316 defer func() {
317 if err == nil {
318 return
319 }
320 for i := range peers {
321 if peers[i] != nil && peers[i].close != nil {
322 plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
323 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
324 peers[i].close(ctx)
325 cancel()
326 }
327 }
328 }()
329
330 for i, u := range cfg.LPUrls {
331 if u.Scheme == "http" {
332 if !cfg.PeerTLSInfo.Empty() {
333 plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
334 }
335 if cfg.PeerTLSInfo.ClientCertAuth {
336 plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
337 }
338 }
339 peers[i] = &peerListener{close: func(context.Context) error { return nil }}
340 peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
341 if err != nil {
342 return nil, err
343 }
344 // once serve, overwrite with 'http.Server.Shutdown'
345 peers[i].close = func(context.Context) error {
346 return peers[i].Listener.Close()
347 }
348 plog.Info("listening for peers on ", u.String())
349 }
350 return peers, nil
351}
352
353// configure peer handlers after rafthttp.Transport started
354func (e *Etcd) servePeers() (err error) {
355 ph := etcdhttp.NewPeerHandler(e.Server)
356 var peerTLScfg *tls.Config
357 if !e.cfg.PeerTLSInfo.Empty() {
358 if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
359 return err
360 }
361 }
362
363 for _, p := range e.Peers {
364 gs := v3rpc.Server(e.Server, peerTLScfg)
365 m := cmux.New(p.Listener)
366 go gs.Serve(m.Match(cmux.HTTP2()))
367 srv := &http.Server{
368 Handler: grpcHandlerFunc(gs, ph),
369 ReadTimeout: 5 * time.Minute,
370 ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
371 }
372 go srv.Serve(m.Match(cmux.Any()))
373 p.serve = func() error { return m.Serve() }
374 p.close = func(ctx context.Context) error {
375 // gracefully shutdown http.Server
376 // close open listeners, idle connections
377 // until context cancel or time-out
378 stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
379 return nil
380 }
381 }
382
383 // start peer servers in a goroutine
384 for _, pl := range e.Peers {
385 go func(l *peerListener) {
386 e.errHandler(l.serve())
387 }(pl)
388 }
389 return nil
390}
391
392func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
393 if err = updateCipherSuites(&cfg.ClientTLSInfo, cfg.CipherSuites); err != nil {
394 return nil, err
395 }
396 if err = cfg.ClientSelfCert(); err != nil {
397 plog.Fatalf("could not get certs (%v)", err)
398 }
399 if cfg.EnablePprof {
400 plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
401 }
402
403 sctxs = make(map[string]*serveCtx)
404 for _, u := range cfg.LCUrls {
405 sctx := newServeCtx()
406
407 if u.Scheme == "http" || u.Scheme == "unix" {
408 if !cfg.ClientTLSInfo.Empty() {
409 plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
410 }
411 if cfg.ClientTLSInfo.ClientCertAuth {
412 plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
413 }
414 }
415 if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
416 return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
417 }
418
419 proto := "tcp"
420 addr := u.Host
421 if u.Scheme == "unix" || u.Scheme == "unixs" {
422 proto = "unix"
423 addr = u.Host + u.Path
424 }
425
426 sctx.secure = u.Scheme == "https" || u.Scheme == "unixs"
427 sctx.insecure = !sctx.secure
428 if oldctx := sctxs[addr]; oldctx != nil {
429 oldctx.secure = oldctx.secure || sctx.secure
430 oldctx.insecure = oldctx.insecure || sctx.insecure
431 continue
432 }
433
434 if sctx.l, err = net.Listen(proto, addr); err != nil {
435 return nil, err
436 }
437 // net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
438 // hosts that disable ipv6. So, use the address given by the user.
439 sctx.addr = addr
440
441 if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
442 if fdLimit <= reservedInternalFDNum {
443 plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
444 }
445 sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
446 }
447
448 if proto == "tcp" {
449 if sctx.l, err = transport.NewKeepAliveListener(sctx.l, "tcp", nil); err != nil {
450 return nil, err
451 }
452 }
453
454 plog.Info("listening for client requests on ", u.Host)
455 defer func() {
456 if err != nil {
457 sctx.l.Close()
458 plog.Info("stopping listening for client requests on ", u.Host)
459 }
460 }()
461 for k := range cfg.UserHandlers {
462 sctx.userHandlers[k] = cfg.UserHandlers[k]
463 }
464 sctx.serviceRegister = cfg.ServiceRegister
465 if cfg.EnablePprof || cfg.Debug {
466 sctx.registerPprof()
467 }
468 if cfg.Debug {
469 sctx.registerTrace()
470 }
471 sctxs[addr] = sctx
472 }
473 return sctxs, nil
474}
475
476func (e *Etcd) serveClients() (err error) {
477 if !e.cfg.ClientTLSInfo.Empty() {
478 plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
479 }
480
481 if e.cfg.CorsInfo.String() != "" {
482 plog.Infof("cors = %s", e.cfg.CorsInfo)
483 }
484
485 // Start a client server goroutine for each listen address
486 var h http.Handler
487 if e.Config().EnableV2 {
488 if len(e.Config().ExperimentalEnableV2V3) > 0 {
489 srv := v2v3.NewServer(v3client.New(e.Server), e.cfg.ExperimentalEnableV2V3)
490 h = v2http.NewClientHandler(srv, e.Server.Cfg.ReqTimeout())
491 } else {
492 h = v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout())
493 }
494 } else {
495 mux := http.NewServeMux()
496 etcdhttp.HandleBasic(mux, e.Server)
497 h = mux
498 }
499 h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo})
500
501 gopts := []grpc.ServerOption{}
502 if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
503 gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
504 MinTime: e.cfg.GRPCKeepAliveMinTime,
505 PermitWithoutStream: false,
506 }))
507 }
508 if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
509 e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
510 gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
511 Time: e.cfg.GRPCKeepAliveInterval,
512 Timeout: e.cfg.GRPCKeepAliveTimeout,
513 }))
514 }
515
516 // start client servers in a goroutine
517 for _, sctx := range e.sctxs {
518 go func(s *serveCtx) {
519 e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
520 }(sctx)
521 }
522 return nil
523}
524
525func (e *Etcd) serveMetrics() (err error) {
526 if e.cfg.Metrics == "extensive" {
527 grpc_prometheus.EnableHandlingTimeHistogram()
528 }
529
530 if len(e.cfg.ListenMetricsUrls) > 0 {
531 metricsMux := http.NewServeMux()
532 etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
533
534 for _, murl := range e.cfg.ListenMetricsUrls {
535 tlsInfo := &e.cfg.ClientTLSInfo
536 if murl.Scheme == "http" {
537 tlsInfo = nil
538 }
539 ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsInfo)
540 if err != nil {
541 return err
542 }
543 e.metricsListeners = append(e.metricsListeners, ml)
544 go func(u url.URL, ln net.Listener) {
545 plog.Info("listening for metrics on ", u.String())
546 e.errHandler(http.Serve(ln, metricsMux))
547 }(murl, ml)
548 }
549 }
550 return nil
551}
552
553func (e *Etcd) errHandler(err error) {
554 select {
555 case <-e.stopc:
556 return
557 default:
558 }
559 select {
560 case <-e.stopc:
561 case e.errc <- err:
562 }
563}
564
565func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
566 h, err := strconv.Atoi(retention)
567 if err == nil {
568 switch mode {
569 case compactor.ModeRevision:
570 ret = time.Duration(int64(h))
571 case compactor.ModePeriodic:
572 ret = time.Duration(int64(h)) * time.Hour
573 }
574 } else {
575 // periodic compaction
576 ret, err = time.ParseDuration(retention)
577 if err != nil {
578 return 0, fmt.Errorf("error parsing CompactionRetention: %v", err)
579 }
580 }
581 return ret, nil
582}