blob: ac7dbc987fb74e2473714af0c3fd996b7ed917fc [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package embed
16
17import (
18 "context"
19 "crypto/tls"
20 "fmt"
21 "io/ioutil"
22 defaultLog "log"
23 "net"
24 "net/http"
25 "net/url"
26 "runtime"
27 "sort"
28 "strconv"
29 "sync"
30 "time"
31
32 "go.etcd.io/etcd/etcdserver"
33 "go.etcd.io/etcd/etcdserver/api/etcdhttp"
34 "go.etcd.io/etcd/etcdserver/api/rafthttp"
35 "go.etcd.io/etcd/etcdserver/api/v2http"
36 "go.etcd.io/etcd/etcdserver/api/v2v3"
37 "go.etcd.io/etcd/etcdserver/api/v3client"
38 "go.etcd.io/etcd/etcdserver/api/v3rpc"
39 "go.etcd.io/etcd/pkg/debugutil"
40 runtimeutil "go.etcd.io/etcd/pkg/runtime"
41 "go.etcd.io/etcd/pkg/transport"
42 "go.etcd.io/etcd/pkg/types"
43 "go.etcd.io/etcd/version"
44
45 "github.com/coreos/pkg/capnslog"
46 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
47 "github.com/soheilhy/cmux"
48 "go.uber.org/zap"
49 "google.golang.org/grpc"
50 "google.golang.org/grpc/keepalive"
51)
52
53var plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "embed")
54
55const (
56 // internal fd usage includes disk usage and transport usage.
57 // To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
58 // at most 2 to read/lock/write WALs. One case that it needs to 2 is to
59 // read all logs after some snapshot index, which locates at the end of
60 // the second last and the head of the last. For purging, it needs to read
61 // directory, so it needs 1. For fd monitor, it needs 1.
62 // For transport, rafthttp builds two long-polling connections and at most
63 // four temporary connections with each member. There are at most 9 members
64 // in a cluster, so it should reserve 96.
65 // For the safety, we set the total reserved number to 150.
66 reservedInternalFDNum = 150
67)
68
69// Etcd contains a running etcd server and its listeners.
70type Etcd struct {
71 Peers []*peerListener
72 Clients []net.Listener
73 // a map of contexts for the servers that serves client requests.
74 sctxs map[string]*serveCtx
75 metricsListeners []net.Listener
76
77 Server *etcdserver.EtcdServer
78
79 cfg Config
80 stopc chan struct{}
81 errc chan error
82
83 closeOnce sync.Once
84}
85
86type peerListener struct {
87 net.Listener
88 serve func() error
89 close func(context.Context) error
90}
91
92// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
93// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
94// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
95func StartEtcd(inCfg *Config) (e *Etcd, err error) {
96 if err = inCfg.Validate(); err != nil {
97 return nil, err
98 }
99 serving := false
100 e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
101 cfg := &e.cfg
102 defer func() {
103 if e == nil || err == nil {
104 return
105 }
106 if !serving {
107 // errored before starting gRPC server for serveCtx.serversC
108 for _, sctx := range e.sctxs {
109 close(sctx.serversC)
110 }
111 }
112 e.Close()
113 e = nil
114 }()
115
116 if e.cfg.logger != nil {
117 e.cfg.logger.Info(
118 "configuring peer listeners",
119 zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
120 )
121 }
122 if e.Peers, err = configurePeerListeners(cfg); err != nil {
123 return e, err
124 }
125
126 if e.cfg.logger != nil {
127 e.cfg.logger.Info(
128 "configuring client listeners",
129 zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
130 )
131 }
132 if e.sctxs, err = configureClientListeners(cfg); err != nil {
133 return e, err
134 }
135
136 for _, sctx := range e.sctxs {
137 e.Clients = append(e.Clients, sctx.l)
138 }
139
140 var (
141 urlsmap types.URLsMap
142 token string
143 )
144 memberInitialized := true
145 if !isMemberInitialized(cfg) {
146 memberInitialized = false
147 urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
148 if err != nil {
149 return e, fmt.Errorf("error setting up initial cluster: %v", err)
150 }
151 }
152
153 // AutoCompactionRetention defaults to "0" if not set.
154 if len(cfg.AutoCompactionRetention) == 0 {
155 cfg.AutoCompactionRetention = "0"
156 }
157 autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
158 if err != nil {
159 return e, err
160 }
161
162 backendFreelistType := parseBackendFreelistType(cfg.ExperimentalBackendFreelistType)
163
164 srvcfg := etcdserver.ServerConfig{
165 Name: cfg.Name,
166 ClientURLs: cfg.ACUrls,
167 PeerURLs: cfg.APUrls,
168 DataDir: cfg.Dir,
169 DedicatedWALDir: cfg.WalDir,
170 SnapshotCount: cfg.SnapshotCount,
171 SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
172 MaxSnapFiles: cfg.MaxSnapFiles,
173 MaxWALFiles: cfg.MaxWalFiles,
174 InitialPeerURLsMap: urlsmap,
175 InitialClusterToken: token,
176 DiscoveryURL: cfg.Durl,
177 DiscoveryProxy: cfg.Dproxy,
178 NewCluster: cfg.IsNewCluster(),
179 PeerTLSInfo: cfg.PeerTLSInfo,
180 TickMs: cfg.TickMs,
181 ElectionTicks: cfg.ElectionTicks(),
182 InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
183 AutoCompactionRetention: autoCompactionRetention,
184 AutoCompactionMode: cfg.AutoCompactionMode,
185 QuotaBackendBytes: cfg.QuotaBackendBytes,
186 BackendBatchLimit: cfg.BackendBatchLimit,
187 BackendFreelistType: backendFreelistType,
188 BackendBatchInterval: cfg.BackendBatchInterval,
189 MaxTxnOps: cfg.MaxTxnOps,
190 MaxRequestBytes: cfg.MaxRequestBytes,
191 StrictReconfigCheck: cfg.StrictReconfigCheck,
192 ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
193 AuthToken: cfg.AuthToken,
194 BcryptCost: cfg.BcryptCost,
195 CORS: cfg.CORS,
196 HostWhitelist: cfg.HostWhitelist,
197 InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
198 CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
199 PreVote: cfg.PreVote,
200 Logger: cfg.logger,
201 LoggerConfig: cfg.loggerConfig,
202 LoggerCore: cfg.loggerCore,
203 LoggerWriteSyncer: cfg.loggerWriteSyncer,
204 Debug: cfg.Debug,
205 ForceNewCluster: cfg.ForceNewCluster,
206 EnableGRPCGateway: cfg.EnableGRPCGateway,
207 EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
208 CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
209 }
210 print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
211 if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
212 return e, err
213 }
214
215 // buffer channel so goroutines on closed connections won't wait forever
216 e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
217
218 // newly started member ("memberInitialized==false")
219 // does not need corruption check
220 if memberInitialized {
221 if err = e.Server.CheckInitialHashKV(); err != nil {
222 // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
223 // (nothing to close since rafthttp transports have not been started)
224 e.Server = nil
225 return e, err
226 }
227 }
228 e.Server.Start()
229
230 if err = e.servePeers(); err != nil {
231 return e, err
232 }
233 if err = e.serveClients(); err != nil {
234 return e, err
235 }
236 if err = e.serveMetrics(); err != nil {
237 return e, err
238 }
239
240 if e.cfg.logger != nil {
241 e.cfg.logger.Info(
242 "now serving peer/client/metrics",
243 zap.String("local-member-id", e.Server.ID().String()),
244 zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()),
245 zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
246 zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
247 zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
248 zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
249 )
250 }
251 serving = true
252 return e, nil
253}
254
255func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitialized bool) {
256 // TODO: remove this after dropping "capnslog"
257 if lg == nil {
258 plog.Infof("name = %s", ec.Name)
259 if sc.ForceNewCluster {
260 plog.Infof("force new cluster")
261 }
262 plog.Infof("data dir = %s", sc.DataDir)
263 plog.Infof("member dir = %s", sc.MemberDir())
264 if sc.DedicatedWALDir != "" {
265 plog.Infof("dedicated WAL dir = %s", sc.DedicatedWALDir)
266 }
267 plog.Infof("heartbeat = %dms", sc.TickMs)
268 plog.Infof("election = %dms", sc.ElectionTicks*int(sc.TickMs))
269 plog.Infof("snapshot count = %d", sc.SnapshotCount)
270 if len(sc.DiscoveryURL) != 0 {
271 plog.Infof("discovery URL= %s", sc.DiscoveryURL)
272 if len(sc.DiscoveryProxy) != 0 {
273 plog.Infof("discovery proxy = %s", sc.DiscoveryProxy)
274 }
275 }
276 plog.Infof("advertise client URLs = %s", sc.ClientURLs)
277 if memberInitialized {
278 plog.Infof("initial advertise peer URLs = %s", sc.PeerURLs)
279 plog.Infof("initial cluster = %s", sc.InitialPeerURLsMap)
280 }
281 } else {
282 cors := make([]string, 0, len(ec.CORS))
283 for v := range ec.CORS {
284 cors = append(cors, v)
285 }
286 sort.Strings(cors)
287
288 hss := make([]string, 0, len(ec.HostWhitelist))
289 for v := range ec.HostWhitelist {
290 hss = append(hss, v)
291 }
292 sort.Strings(hss)
293
294 quota := ec.QuotaBackendBytes
295 if quota == 0 {
296 quota = etcdserver.DefaultQuotaBytes
297 }
298
299 lg.Info(
300 "starting an etcd server",
301 zap.String("etcd-version", version.Version),
302 zap.String("git-sha", version.GitSHA),
303 zap.String("go-version", runtime.Version()),
304 zap.String("go-os", runtime.GOOS),
305 zap.String("go-arch", runtime.GOARCH),
306 zap.Int("max-cpu-set", runtime.GOMAXPROCS(0)),
307 zap.Int("max-cpu-available", runtime.NumCPU()),
308 zap.Bool("member-initialized", memberInitialized),
309 zap.String("name", sc.Name),
310 zap.String("data-dir", sc.DataDir),
311 zap.String("wal-dir", ec.WalDir),
312 zap.String("wal-dir-dedicated", sc.DedicatedWALDir),
313 zap.String("member-dir", sc.MemberDir()),
314 zap.Bool("force-new-cluster", sc.ForceNewCluster),
315 zap.String("heartbeat-interval", fmt.Sprintf("%v", time.Duration(sc.TickMs)*time.Millisecond)),
316 zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(sc.ElectionTicks*int(sc.TickMs))*time.Millisecond)),
317 zap.Bool("initial-election-tick-advance", sc.InitialElectionTickAdvance),
318 zap.Uint64("snapshot-count", sc.SnapshotCount),
319 zap.Uint64("snapshot-catchup-entries", sc.SnapshotCatchUpEntries),
320 zap.Strings("initial-advertise-peer-urls", ec.getAPURLs()),
321 zap.Strings("listen-peer-urls", ec.getLPURLs()),
322 zap.Strings("advertise-client-urls", ec.getACURLs()),
323 zap.Strings("listen-client-urls", ec.getLCURLs()),
324 zap.Strings("listen-metrics-urls", ec.getMetricsURLs()),
325 zap.Strings("cors", cors),
326 zap.Strings("host-whitelist", hss),
327 zap.String("initial-cluster", sc.InitialPeerURLsMap.String()),
328 zap.String("initial-cluster-state", ec.ClusterState),
329 zap.String("initial-cluster-token", sc.InitialClusterToken),
330 zap.Int64("quota-size-bytes", quota),
331 zap.Bool("pre-vote", sc.PreVote),
332 zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
333 zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
334 zap.String("auto-compaction-mode", sc.AutoCompactionMode),
335 zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention),
336 zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
337 zap.String("discovery-url", sc.DiscoveryURL),
338 zap.String("discovery-proxy", sc.DiscoveryProxy),
339 )
340 }
341}
342
343// Config returns the current configuration.
344func (e *Etcd) Config() Config {
345 return e.cfg
346}
347
348// Close gracefully shuts down all servers/listeners.
349// Client requests will be terminated with request timeout.
350// After timeout, enforce remaning requests be closed immediately.
351func (e *Etcd) Close() {
352 fields := []zap.Field{
353 zap.String("name", e.cfg.Name),
354 zap.String("data-dir", e.cfg.Dir),
355 zap.Strings("advertise-peer-urls", e.cfg.getAPURLs()),
356 zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
357 }
358 lg := e.GetLogger()
359 if lg != nil {
360 lg.Info("closing etcd server", fields...)
361 }
362 defer func() {
363 if lg != nil {
364 lg.Info("closed etcd server", fields...)
365 lg.Sync()
366 }
367 }()
368
369 e.closeOnce.Do(func() { close(e.stopc) })
370
371 // close client requests with request timeout
372 timeout := 2 * time.Second
373 if e.Server != nil {
374 timeout = e.Server.Cfg.ReqTimeout()
375 }
376 for _, sctx := range e.sctxs {
377 for ss := range sctx.serversC {
378 ctx, cancel := context.WithTimeout(context.Background(), timeout)
379 stopServers(ctx, ss)
380 cancel()
381 }
382 }
383
384 for _, sctx := range e.sctxs {
385 sctx.cancel()
386 }
387
388 for i := range e.Clients {
389 if e.Clients[i] != nil {
390 e.Clients[i].Close()
391 }
392 }
393
394 for i := range e.metricsListeners {
395 e.metricsListeners[i].Close()
396 }
397
398 // close rafthttp transports
399 if e.Server != nil {
400 e.Server.Stop()
401 }
402
403 // close all idle connections in peer handler (wait up to 1-second)
404 for i := range e.Peers {
405 if e.Peers[i] != nil && e.Peers[i].close != nil {
406 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
407 e.Peers[i].close(ctx)
408 cancel()
409 }
410 }
411}
412
413func stopServers(ctx context.Context, ss *servers) {
414 shutdownNow := func() {
415 // first, close the http.Server
416 ss.http.Shutdown(ctx)
417 // then close grpc.Server; cancels all active RPCs
418 ss.grpc.Stop()
419 }
420
421 // do not grpc.Server.GracefulStop with TLS enabled etcd server
422 // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
423 // and https://github.com/etcd-io/etcd/issues/8916
424 if ss.secure {
425 shutdownNow()
426 return
427 }
428
429 ch := make(chan struct{})
430 go func() {
431 defer close(ch)
432 // close listeners to stop accepting new connections,
433 // will block on any existing transports
434 ss.grpc.GracefulStop()
435 }()
436
437 // wait until all pending RPCs are finished
438 select {
439 case <-ch:
440 case <-ctx.Done():
441 // took too long, manually close open transports
442 // e.g. watch streams
443 shutdownNow()
444
445 // concurrent GracefulStop should be interrupted
446 <-ch
447 }
448}
449
450func (e *Etcd) Err() <-chan error { return e.errc }
451
452func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
453 if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
454 return nil, err
455 }
456 if err = cfg.PeerSelfCert(); err != nil {
457 if cfg.logger != nil {
458 cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err))
459 } else {
460 plog.Fatalf("could not get certs (%v)", err)
461 }
462 }
463 if !cfg.PeerTLSInfo.Empty() {
464 if cfg.logger != nil {
465 cfg.logger.Info(
466 "starting with peer TLS",
467 zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)),
468 zap.Strings("cipher-suites", cfg.CipherSuites),
469 )
470 } else {
471 plog.Infof("peerTLS: %s", cfg.PeerTLSInfo)
472 }
473 }
474
475 peers = make([]*peerListener, len(cfg.LPUrls))
476 defer func() {
477 if err == nil {
478 return
479 }
480 for i := range peers {
481 if peers[i] != nil && peers[i].close != nil {
482 if cfg.logger != nil {
483 cfg.logger.Warn(
484 "closing peer listener",
485 zap.String("address", cfg.LPUrls[i].String()),
486 zap.Error(err),
487 )
488 } else {
489 plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
490 }
491 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
492 peers[i].close(ctx)
493 cancel()
494 }
495 }
496 }()
497
498 for i, u := range cfg.LPUrls {
499 if u.Scheme == "http" {
500 if !cfg.PeerTLSInfo.Empty() {
501 if cfg.logger != nil {
502 cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String()))
503 } else {
504 plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
505 }
506 }
507 if cfg.PeerTLSInfo.ClientCertAuth {
508 if cfg.logger != nil {
509 cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String()))
510 } else {
511 plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
512 }
513 }
514 }
515 peers[i] = &peerListener{close: func(context.Context) error { return nil }}
516 peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
517 if err != nil {
518 return nil, err
519 }
520 // once serve, overwrite with 'http.Server.Shutdown'
521 peers[i].close = func(context.Context) error {
522 return peers[i].Listener.Close()
523 }
524 }
525 return peers, nil
526}
527
528// configure peer handlers after rafthttp.Transport started
529func (e *Etcd) servePeers() (err error) {
530 ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
531 var peerTLScfg *tls.Config
532 if !e.cfg.PeerTLSInfo.Empty() {
533 if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
534 return err
535 }
536 }
537
538 for _, p := range e.Peers {
539 u := p.Listener.Addr().String()
540 gs := v3rpc.Server(e.Server, peerTLScfg)
541 m := cmux.New(p.Listener)
542 go gs.Serve(m.Match(cmux.HTTP2()))
543 srv := &http.Server{
544 Handler: grpcHandlerFunc(gs, ph),
545 ReadTimeout: 5 * time.Minute,
546 ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
547 }
548 go srv.Serve(m.Match(cmux.Any()))
549 p.serve = func() error { return m.Serve() }
550 p.close = func(ctx context.Context) error {
551 // gracefully shutdown http.Server
552 // close open listeners, idle connections
553 // until context cancel or time-out
554 if e.cfg.logger != nil {
555 e.cfg.logger.Info(
556 "stopping serving peer traffic",
557 zap.String("address", u),
558 )
559 }
560 stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
561 if e.cfg.logger != nil {
562 e.cfg.logger.Info(
563 "stopped serving peer traffic",
564 zap.String("address", u),
565 )
566 }
567 return nil
568 }
569 }
570
571 // start peer servers in a goroutine
572 for _, pl := range e.Peers {
573 go func(l *peerListener) {
574 u := l.Addr().String()
575 if e.cfg.logger != nil {
576 e.cfg.logger.Info(
577 "serving peer traffic",
578 zap.String("address", u),
579 )
580 } else {
581 plog.Info("listening for peers on ", u)
582 }
583 e.errHandler(l.serve())
584 }(pl)
585 }
586 return nil
587}
588
589func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
590 if err = updateCipherSuites(&cfg.ClientTLSInfo, cfg.CipherSuites); err != nil {
591 return nil, err
592 }
593 if err = cfg.ClientSelfCert(); err != nil {
594 if cfg.logger != nil {
595 cfg.logger.Fatal("failed to get client self-signed certs", zap.Error(err))
596 } else {
597 plog.Fatalf("could not get certs (%v)", err)
598 }
599 }
600 if cfg.EnablePprof {
601 if cfg.logger != nil {
602 cfg.logger.Info("pprof is enabled", zap.String("path", debugutil.HTTPPrefixPProf))
603 } else {
604 plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
605 }
606 }
607
608 sctxs = make(map[string]*serveCtx)
609 for _, u := range cfg.LCUrls {
610 sctx := newServeCtx(cfg.logger)
611 if u.Scheme == "http" || u.Scheme == "unix" {
612 if !cfg.ClientTLSInfo.Empty() {
613 if cfg.logger != nil {
614 cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String()))
615 } else {
616 plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
617 }
618 }
619 if cfg.ClientTLSInfo.ClientCertAuth {
620 if cfg.logger != nil {
621 cfg.logger.Warn("scheme is HTTP while --client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("client-url", u.String()))
622 } else {
623 plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
624 }
625 }
626 }
627 if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
628 return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String())
629 }
630
631 network := "tcp"
632 addr := u.Host
633 if u.Scheme == "unix" || u.Scheme == "unixs" {
634 network = "unix"
635 addr = u.Host + u.Path
636 }
637 sctx.network = network
638
639 sctx.secure = u.Scheme == "https" || u.Scheme == "unixs"
640 sctx.insecure = !sctx.secure
641 if oldctx := sctxs[addr]; oldctx != nil {
642 oldctx.secure = oldctx.secure || sctx.secure
643 oldctx.insecure = oldctx.insecure || sctx.insecure
644 continue
645 }
646
647 if sctx.l, err = net.Listen(network, addr); err != nil {
648 return nil, err
649 }
650 // net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
651 // hosts that disable ipv6. So, use the address given by the user.
652 sctx.addr = addr
653
654 if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
655 if fdLimit <= reservedInternalFDNum {
656 if cfg.logger != nil {
657 cfg.logger.Fatal(
658 "file descriptor limit of etcd process is too low; please set higher",
659 zap.Uint64("limit", fdLimit),
660 zap.Int("recommended-limit", reservedInternalFDNum),
661 )
662 } else {
663 plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
664 }
665 }
666 sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
667 }
668
669 if network == "tcp" {
670 if sctx.l, err = transport.NewKeepAliveListener(sctx.l, network, nil); err != nil {
671 return nil, err
672 }
673 }
674
675 defer func() {
676 if err == nil {
677 return
678 }
679 sctx.l.Close()
680 if cfg.logger != nil {
681 cfg.logger.Warn(
682 "closing peer listener",
683 zap.String("address", u.Host),
684 zap.Error(err),
685 )
686 } else {
687 plog.Info("stopping listening for client requests on ", u.Host)
688 }
689 }()
690 for k := range cfg.UserHandlers {
691 sctx.userHandlers[k] = cfg.UserHandlers[k]
692 }
693 sctx.serviceRegister = cfg.ServiceRegister
694 if cfg.EnablePprof || cfg.Debug {
695 sctx.registerPprof()
696 }
697 if cfg.Debug {
698 sctx.registerTrace()
699 }
700 sctxs[addr] = sctx
701 }
702 return sctxs, nil
703}
704
705func (e *Etcd) serveClients() (err error) {
706 if !e.cfg.ClientTLSInfo.Empty() {
707 if e.cfg.logger != nil {
708 e.cfg.logger.Info(
709 "starting with client TLS",
710 zap.String("tls-info", fmt.Sprintf("%+v", e.cfg.ClientTLSInfo)),
711 zap.Strings("cipher-suites", e.cfg.CipherSuites),
712 )
713 } else {
714 plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
715 }
716 }
717
718 // Start a client server goroutine for each listen address
719 var h http.Handler
720 if e.Config().EnableV2 {
721 if len(e.Config().ExperimentalEnableV2V3) > 0 {
722 srv := v2v3.NewServer(e.cfg.logger, v3client.New(e.Server), e.cfg.ExperimentalEnableV2V3)
723 h = v2http.NewClientHandler(e.GetLogger(), srv, e.Server.Cfg.ReqTimeout())
724 } else {
725 h = v2http.NewClientHandler(e.GetLogger(), e.Server, e.Server.Cfg.ReqTimeout())
726 }
727 } else {
728 mux := http.NewServeMux()
729 etcdhttp.HandleBasic(mux, e.Server)
730 h = mux
731 }
732
733 gopts := []grpc.ServerOption{}
734 if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
735 gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
736 MinTime: e.cfg.GRPCKeepAliveMinTime,
737 PermitWithoutStream: false,
738 }))
739 }
740 if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
741 e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
742 gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
743 Time: e.cfg.GRPCKeepAliveInterval,
744 Timeout: e.cfg.GRPCKeepAliveTimeout,
745 }))
746 }
747
748 // start client servers in each goroutine
749 for _, sctx := range e.sctxs {
750 go func(s *serveCtx) {
751 e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
752 }(sctx)
753 }
754 return nil
755}
756
757func (e *Etcd) serveMetrics() (err error) {
758 if e.cfg.Metrics == "extensive" {
759 grpc_prometheus.EnableHandlingTimeHistogram()
760 }
761
762 if len(e.cfg.ListenMetricsUrls) > 0 {
763 metricsMux := http.NewServeMux()
764 etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
765
766 for _, murl := range e.cfg.ListenMetricsUrls {
767 tlsInfo := &e.cfg.ClientTLSInfo
768 if murl.Scheme == "http" {
769 tlsInfo = nil
770 }
771 ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsInfo)
772 if err != nil {
773 return err
774 }
775 e.metricsListeners = append(e.metricsListeners, ml)
776 go func(u url.URL, ln net.Listener) {
777 if e.cfg.logger != nil {
778 e.cfg.logger.Info(
779 "serving metrics",
780 zap.String("address", u.String()),
781 )
782 } else {
783 plog.Info("listening for metrics on ", u.String())
784 }
785 e.errHandler(http.Serve(ln, metricsMux))
786 }(murl, ml)
787 }
788 }
789 return nil
790}
791
792func (e *Etcd) errHandler(err error) {
793 select {
794 case <-e.stopc:
795 return
796 default:
797 }
798 select {
799 case <-e.stopc:
800 case e.errc <- err:
801 }
802}
803
804// GetLogger returns the logger.
805func (e *Etcd) GetLogger() *zap.Logger {
806 e.cfg.loggerMu.RLock()
807 l := e.cfg.logger
808 e.cfg.loggerMu.RUnlock()
809 return l
810}
811
812func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
813 h, err := strconv.Atoi(retention)
814 if err == nil {
815 switch mode {
816 case CompactorModeRevision:
817 ret = time.Duration(int64(h))
818 case CompactorModePeriodic:
819 ret = time.Duration(int64(h)) * time.Hour
820 }
821 } else {
822 // periodic compaction
823 ret, err = time.ParseDuration(retention)
824 if err != nil {
825 return 0, fmt.Errorf("error parsing CompactionRetention: %v", err)
826 }
827 }
828 return ret, nil
829}