blob: d656e179c59a23abf0cffe59ca48b1ecc66de677 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package embed
16
17import (
18 "context"
19 "crypto/tls"
20 "fmt"
21 "io/ioutil"
22 defaultLog "log"
23 "net"
24 "net/http"
25 "net/url"
khenaidooab1f7bd2019-11-14 14:00:27 -050026 "strconv"
27 "sync"
28 "time"
29
khenaidood948f772021-08-11 17:49:24 -040030 "github.com/coreos/etcd/compactor"
31 "github.com/coreos/etcd/etcdserver"
32 "github.com/coreos/etcd/etcdserver/api/etcdhttp"
33 "github.com/coreos/etcd/etcdserver/api/v2http"
34 "github.com/coreos/etcd/etcdserver/api/v2v3"
35 "github.com/coreos/etcd/etcdserver/api/v3client"
36 "github.com/coreos/etcd/etcdserver/api/v3rpc"
37 "github.com/coreos/etcd/pkg/cors"
38 "github.com/coreos/etcd/pkg/debugutil"
39 runtimeutil "github.com/coreos/etcd/pkg/runtime"
40 "github.com/coreos/etcd/pkg/transport"
41 "github.com/coreos/etcd/pkg/types"
42 "github.com/coreos/etcd/rafthttp"
khenaidooab1f7bd2019-11-14 14:00:27 -050043
44 "github.com/coreos/pkg/capnslog"
45 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
46 "github.com/soheilhy/cmux"
khenaidooab1f7bd2019-11-14 14:00:27 -050047 "google.golang.org/grpc"
48 "google.golang.org/grpc/keepalive"
49)
50
khenaidood948f772021-08-11 17:49:24 -040051var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
khenaidooab1f7bd2019-11-14 14:00:27 -050052
53const (
54 // internal fd usage includes disk usage and transport usage.
55 // To read/write snapshot, snap pkg needs 1. In normal case, wal pkg needs
56 // at most 2 to read/lock/write WALs. One case that it needs to 2 is to
57 // read all logs after some snapshot index, which locates at the end of
58 // the second last and the head of the last. For purging, it needs to read
59 // directory, so it needs 1. For fd monitor, it needs 1.
60 // For transport, rafthttp builds two long-polling connections and at most
61 // four temporary connections with each member. There are at most 9 members
62 // in a cluster, so it should reserve 96.
63 // For the safety, we set the total reserved number to 150.
64 reservedInternalFDNum = 150
65)
66
67// Etcd contains a running etcd server and its listeners.
68type Etcd struct {
69 Peers []*peerListener
70 Clients []net.Listener
71 // a map of contexts for the servers that serves client requests.
72 sctxs map[string]*serveCtx
73 metricsListeners []net.Listener
74
75 Server *etcdserver.EtcdServer
76
77 cfg Config
78 stopc chan struct{}
79 errc chan error
80
81 closeOnce sync.Once
82}
83
84type peerListener struct {
85 net.Listener
86 serve func() error
87 close func(context.Context) error
88}
89
90// StartEtcd launches the etcd server and HTTP handlers for client/server communication.
91// The returned Etcd.Server is not guaranteed to have joined the cluster. Wait
92// on the Etcd.Server.ReadyNotify() channel to know when it completes and is ready for use.
93func StartEtcd(inCfg *Config) (e *Etcd, err error) {
94 if err = inCfg.Validate(); err != nil {
95 return nil, err
96 }
97 serving := false
98 e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
99 cfg := &e.cfg
100 defer func() {
101 if e == nil || err == nil {
102 return
103 }
104 if !serving {
105 // errored before starting gRPC server for serveCtx.serversC
106 for _, sctx := range e.sctxs {
107 close(sctx.serversC)
108 }
109 }
110 e.Close()
111 e = nil
112 }()
113
khenaidood948f772021-08-11 17:49:24 -0400114 if e.Peers, err = startPeerListeners(cfg); err != nil {
khenaidooab1f7bd2019-11-14 14:00:27 -0500115 return e, err
116 }
khenaidood948f772021-08-11 17:49:24 -0400117 if e.sctxs, err = startClientListeners(cfg); err != nil {
khenaidooab1f7bd2019-11-14 14:00:27 -0500118 return e, err
119 }
khenaidooab1f7bd2019-11-14 14:00:27 -0500120 for _, sctx := range e.sctxs {
121 e.Clients = append(e.Clients, sctx.l)
122 }
123
124 var (
125 urlsmap types.URLsMap
126 token string
127 )
khenaidood948f772021-08-11 17:49:24 -0400128
khenaidooab1f7bd2019-11-14 14:00:27 -0500129 memberInitialized := true
130 if !isMemberInitialized(cfg) {
131 memberInitialized = false
132 urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
133 if err != nil {
134 return e, fmt.Errorf("error setting up initial cluster: %v", err)
135 }
136 }
137
138 // AutoCompactionRetention defaults to "0" if not set.
139 if len(cfg.AutoCompactionRetention) == 0 {
140 cfg.AutoCompactionRetention = "0"
141 }
142 autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
143 if err != nil {
144 return e, err
145 }
146
khenaidooab1f7bd2019-11-14 14:00:27 -0500147 srvcfg := etcdserver.ServerConfig{
148 Name: cfg.Name,
149 ClientURLs: cfg.ACUrls,
150 PeerURLs: cfg.APUrls,
151 DataDir: cfg.Dir,
152 DedicatedWALDir: cfg.WalDir,
khenaidood948f772021-08-11 17:49:24 -0400153 SnapCount: cfg.SnapCount,
khenaidooab1f7bd2019-11-14 14:00:27 -0500154 MaxSnapFiles: cfg.MaxSnapFiles,
155 MaxWALFiles: cfg.MaxWalFiles,
156 InitialPeerURLsMap: urlsmap,
157 InitialClusterToken: token,
158 DiscoveryURL: cfg.Durl,
159 DiscoveryProxy: cfg.Dproxy,
160 NewCluster: cfg.IsNewCluster(),
khenaidood948f772021-08-11 17:49:24 -0400161 ForceNewCluster: cfg.ForceNewCluster,
khenaidooab1f7bd2019-11-14 14:00:27 -0500162 PeerTLSInfo: cfg.PeerTLSInfo,
163 TickMs: cfg.TickMs,
164 ElectionTicks: cfg.ElectionTicks(),
165 InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
166 AutoCompactionRetention: autoCompactionRetention,
167 AutoCompactionMode: cfg.AutoCompactionMode,
168 QuotaBackendBytes: cfg.QuotaBackendBytes,
khenaidooab1f7bd2019-11-14 14:00:27 -0500169 MaxTxnOps: cfg.MaxTxnOps,
170 MaxRequestBytes: cfg.MaxRequestBytes,
171 StrictReconfigCheck: cfg.StrictReconfigCheck,
172 ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
173 AuthToken: cfg.AuthToken,
khenaidood948f772021-08-11 17:49:24 -0400174 TokenTTL: cfg.AuthTokenTTL,
khenaidooab1f7bd2019-11-14 14:00:27 -0500175 InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
176 CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
khenaidooab1f7bd2019-11-14 14:00:27 -0500177 Debug: cfg.Debug,
khenaidooab1f7bd2019-11-14 14:00:27 -0500178 }
khenaidood948f772021-08-11 17:49:24 -0400179
khenaidooab1f7bd2019-11-14 14:00:27 -0500180 if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
181 return e, err
182 }
183
184 // buffer channel so goroutines on closed connections won't wait forever
185 e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
186
187 // newly started member ("memberInitialized==false")
188 // does not need corruption check
189 if memberInitialized {
190 if err = e.Server.CheckInitialHashKV(); err != nil {
191 // set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
192 // (nothing to close since rafthttp transports have not been started)
193 e.Server = nil
194 return e, err
195 }
196 }
197 e.Server.Start()
198
199 if err = e.servePeers(); err != nil {
200 return e, err
201 }
202 if err = e.serveClients(); err != nil {
203 return e, err
204 }
205 if err = e.serveMetrics(); err != nil {
206 return e, err
207 }
208
khenaidooab1f7bd2019-11-14 14:00:27 -0500209 serving = true
210 return e, nil
211}
212
khenaidooab1f7bd2019-11-14 14:00:27 -0500213// Config returns the current configuration.
214func (e *Etcd) Config() Config {
215 return e.cfg
216}
217
218// Close gracefully shuts down all servers/listeners.
219// Client requests will be terminated with request timeout.
220// After timeout, enforce remaning requests be closed immediately.
221func (e *Etcd) Close() {
khenaidooab1f7bd2019-11-14 14:00:27 -0500222 e.closeOnce.Do(func() { close(e.stopc) })
223
224 // close client requests with request timeout
225 timeout := 2 * time.Second
226 if e.Server != nil {
227 timeout = e.Server.Cfg.ReqTimeout()
228 }
229 for _, sctx := range e.sctxs {
230 for ss := range sctx.serversC {
231 ctx, cancel := context.WithTimeout(context.Background(), timeout)
232 stopServers(ctx, ss)
233 cancel()
234 }
235 }
236
237 for _, sctx := range e.sctxs {
238 sctx.cancel()
239 }
240
241 for i := range e.Clients {
242 if e.Clients[i] != nil {
243 e.Clients[i].Close()
244 }
245 }
246
247 for i := range e.metricsListeners {
248 e.metricsListeners[i].Close()
249 }
250
251 // close rafthttp transports
252 if e.Server != nil {
253 e.Server.Stop()
254 }
255
256 // close all idle connections in peer handler (wait up to 1-second)
257 for i := range e.Peers {
258 if e.Peers[i] != nil && e.Peers[i].close != nil {
259 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
260 e.Peers[i].close(ctx)
261 cancel()
262 }
263 }
264}
265
266func stopServers(ctx context.Context, ss *servers) {
267 shutdownNow := func() {
268 // first, close the http.Server
269 ss.http.Shutdown(ctx)
270 // then close grpc.Server; cancels all active RPCs
271 ss.grpc.Stop()
272 }
273
274 // do not grpc.Server.GracefulStop with TLS enabled etcd server
275 // See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
khenaidood948f772021-08-11 17:49:24 -0400276 // and https://github.com/coreos/etcd/issues/8916
khenaidooab1f7bd2019-11-14 14:00:27 -0500277 if ss.secure {
278 shutdownNow()
279 return
280 }
281
282 ch := make(chan struct{})
283 go func() {
284 defer close(ch)
285 // close listeners to stop accepting new connections,
286 // will block on any existing transports
287 ss.grpc.GracefulStop()
288 }()
289
290 // wait until all pending RPCs are finished
291 select {
292 case <-ch:
293 case <-ctx.Done():
294 // took too long, manually close open transports
295 // e.g. watch streams
296 shutdownNow()
297
298 // concurrent GracefulStop should be interrupted
299 <-ch
300 }
301}
302
303func (e *Etcd) Err() <-chan error { return e.errc }
304
khenaidood948f772021-08-11 17:49:24 -0400305func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
khenaidooab1f7bd2019-11-14 14:00:27 -0500306 if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
307 return nil, err
308 }
309 if err = cfg.PeerSelfCert(); err != nil {
khenaidood948f772021-08-11 17:49:24 -0400310 plog.Fatalf("could not get certs (%v)", err)
khenaidooab1f7bd2019-11-14 14:00:27 -0500311 }
312 if !cfg.PeerTLSInfo.Empty() {
khenaidood948f772021-08-11 17:49:24 -0400313 plog.Infof("peerTLS: %s", cfg.PeerTLSInfo)
khenaidooab1f7bd2019-11-14 14:00:27 -0500314 }
315
316 peers = make([]*peerListener, len(cfg.LPUrls))
317 defer func() {
318 if err == nil {
319 return
320 }
321 for i := range peers {
322 if peers[i] != nil && peers[i].close != nil {
khenaidood948f772021-08-11 17:49:24 -0400323 plog.Info("stopping listening for peers on ", cfg.LPUrls[i].String())
khenaidooab1f7bd2019-11-14 14:00:27 -0500324 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
325 peers[i].close(ctx)
326 cancel()
327 }
328 }
329 }()
330
331 for i, u := range cfg.LPUrls {
332 if u.Scheme == "http" {
333 if !cfg.PeerTLSInfo.Empty() {
khenaidood948f772021-08-11 17:49:24 -0400334 plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
khenaidooab1f7bd2019-11-14 14:00:27 -0500335 }
336 if cfg.PeerTLSInfo.ClientCertAuth {
khenaidood948f772021-08-11 17:49:24 -0400337 plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
khenaidooab1f7bd2019-11-14 14:00:27 -0500338 }
339 }
340 peers[i] = &peerListener{close: func(context.Context) error { return nil }}
341 peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
342 if err != nil {
343 return nil, err
344 }
345 // once serve, overwrite with 'http.Server.Shutdown'
346 peers[i].close = func(context.Context) error {
347 return peers[i].Listener.Close()
348 }
khenaidood948f772021-08-11 17:49:24 -0400349 plog.Info("listening for peers on ", u.String())
khenaidooab1f7bd2019-11-14 14:00:27 -0500350 }
351 return peers, nil
352}
353
354// configure peer handlers after rafthttp.Transport started
355func (e *Etcd) servePeers() (err error) {
khenaidood948f772021-08-11 17:49:24 -0400356 ph := etcdhttp.NewPeerHandler(e.Server)
khenaidooab1f7bd2019-11-14 14:00:27 -0500357 var peerTLScfg *tls.Config
358 if !e.cfg.PeerTLSInfo.Empty() {
359 if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
360 return err
361 }
362 }
363
364 for _, p := range e.Peers {
khenaidooab1f7bd2019-11-14 14:00:27 -0500365 gs := v3rpc.Server(e.Server, peerTLScfg)
366 m := cmux.New(p.Listener)
367 go gs.Serve(m.Match(cmux.HTTP2()))
368 srv := &http.Server{
369 Handler: grpcHandlerFunc(gs, ph),
370 ReadTimeout: 5 * time.Minute,
371 ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
372 }
373 go srv.Serve(m.Match(cmux.Any()))
374 p.serve = func() error { return m.Serve() }
375 p.close = func(ctx context.Context) error {
376 // gracefully shutdown http.Server
377 // close open listeners, idle connections
378 // until context cancel or time-out
khenaidooab1f7bd2019-11-14 14:00:27 -0500379 stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
khenaidooab1f7bd2019-11-14 14:00:27 -0500380 return nil
381 }
382 }
383
384 // start peer servers in a goroutine
385 for _, pl := range e.Peers {
386 go func(l *peerListener) {
khenaidooab1f7bd2019-11-14 14:00:27 -0500387 e.errHandler(l.serve())
388 }(pl)
389 }
390 return nil
391}
392
khenaidood948f772021-08-11 17:49:24 -0400393func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
khenaidooab1f7bd2019-11-14 14:00:27 -0500394 if err = updateCipherSuites(&cfg.ClientTLSInfo, cfg.CipherSuites); err != nil {
395 return nil, err
396 }
397 if err = cfg.ClientSelfCert(); err != nil {
khenaidood948f772021-08-11 17:49:24 -0400398 plog.Fatalf("could not get certs (%v)", err)
khenaidooab1f7bd2019-11-14 14:00:27 -0500399 }
400 if cfg.EnablePprof {
khenaidood948f772021-08-11 17:49:24 -0400401 plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
khenaidooab1f7bd2019-11-14 14:00:27 -0500402 }
403
404 sctxs = make(map[string]*serveCtx)
405 for _, u := range cfg.LCUrls {
khenaidood948f772021-08-11 17:49:24 -0400406 sctx := newServeCtx()
407
khenaidooab1f7bd2019-11-14 14:00:27 -0500408 if u.Scheme == "http" || u.Scheme == "unix" {
409 if !cfg.ClientTLSInfo.Empty() {
khenaidood948f772021-08-11 17:49:24 -0400410 plog.Warningf("The scheme of client url %s is HTTP while peer key/cert files are presented. Ignored key/cert files.", u.String())
khenaidooab1f7bd2019-11-14 14:00:27 -0500411 }
412 if cfg.ClientTLSInfo.ClientCertAuth {
khenaidood948f772021-08-11 17:49:24 -0400413 plog.Warningf("The scheme of client url %s is HTTP while client cert auth (--client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
khenaidooab1f7bd2019-11-14 14:00:27 -0500414 }
415 }
416 if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
khenaidood948f772021-08-11 17:49:24 -0400417 return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPs scheme", u.String())
khenaidooab1f7bd2019-11-14 14:00:27 -0500418 }
419
khenaidood948f772021-08-11 17:49:24 -0400420 proto := "tcp"
khenaidooab1f7bd2019-11-14 14:00:27 -0500421 addr := u.Host
422 if u.Scheme == "unix" || u.Scheme == "unixs" {
khenaidood948f772021-08-11 17:49:24 -0400423 proto = "unix"
khenaidooab1f7bd2019-11-14 14:00:27 -0500424 addr = u.Host + u.Path
425 }
khenaidooab1f7bd2019-11-14 14:00:27 -0500426
427 sctx.secure = u.Scheme == "https" || u.Scheme == "unixs"
428 sctx.insecure = !sctx.secure
429 if oldctx := sctxs[addr]; oldctx != nil {
430 oldctx.secure = oldctx.secure || sctx.secure
431 oldctx.insecure = oldctx.insecure || sctx.insecure
432 continue
433 }
434
khenaidood948f772021-08-11 17:49:24 -0400435 if sctx.l, err = net.Listen(proto, addr); err != nil {
khenaidooab1f7bd2019-11-14 14:00:27 -0500436 return nil, err
437 }
438 // net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
439 // hosts that disable ipv6. So, use the address given by the user.
440 sctx.addr = addr
441
442 if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
443 if fdLimit <= reservedInternalFDNum {
khenaidood948f772021-08-11 17:49:24 -0400444 plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
khenaidooab1f7bd2019-11-14 14:00:27 -0500445 }
446 sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
447 }
448
khenaidood948f772021-08-11 17:49:24 -0400449 if proto == "tcp" {
450 if sctx.l, err = transport.NewKeepAliveListener(sctx.l, "tcp", nil); err != nil {
khenaidooab1f7bd2019-11-14 14:00:27 -0500451 return nil, err
452 }
453 }
454
khenaidood948f772021-08-11 17:49:24 -0400455 plog.Info("listening for client requests on ", u.Host)
khenaidooab1f7bd2019-11-14 14:00:27 -0500456 defer func() {
khenaidood948f772021-08-11 17:49:24 -0400457 if err != nil {
458 sctx.l.Close()
khenaidooab1f7bd2019-11-14 14:00:27 -0500459 plog.Info("stopping listening for client requests on ", u.Host)
460 }
461 }()
462 for k := range cfg.UserHandlers {
463 sctx.userHandlers[k] = cfg.UserHandlers[k]
464 }
465 sctx.serviceRegister = cfg.ServiceRegister
466 if cfg.EnablePprof || cfg.Debug {
467 sctx.registerPprof()
468 }
469 if cfg.Debug {
470 sctx.registerTrace()
471 }
472 sctxs[addr] = sctx
473 }
474 return sctxs, nil
475}
476
477func (e *Etcd) serveClients() (err error) {
478 if !e.cfg.ClientTLSInfo.Empty() {
khenaidood948f772021-08-11 17:49:24 -0400479 plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
480 }
481
482 if e.cfg.CorsInfo.String() != "" {
483 plog.Infof("cors = %s", e.cfg.CorsInfo)
khenaidooab1f7bd2019-11-14 14:00:27 -0500484 }
485
486 // Start a client server goroutine for each listen address
487 var h http.Handler
488 if e.Config().EnableV2 {
489 if len(e.Config().ExperimentalEnableV2V3) > 0 {
khenaidood948f772021-08-11 17:49:24 -0400490 srv := v2v3.NewServer(v3client.New(e.Server), e.cfg.ExperimentalEnableV2V3)
491 h = v2http.NewClientHandler(srv, e.Server.Cfg.ReqTimeout())
khenaidooab1f7bd2019-11-14 14:00:27 -0500492 } else {
khenaidood948f772021-08-11 17:49:24 -0400493 h = v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout())
khenaidooab1f7bd2019-11-14 14:00:27 -0500494 }
495 } else {
496 mux := http.NewServeMux()
497 etcdhttp.HandleBasic(mux, e.Server)
498 h = mux
499 }
khenaidood948f772021-08-11 17:49:24 -0400500 h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo})
khenaidooab1f7bd2019-11-14 14:00:27 -0500501
502 gopts := []grpc.ServerOption{}
503 if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) {
504 gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
505 MinTime: e.cfg.GRPCKeepAliveMinTime,
506 PermitWithoutStream: false,
507 }))
508 }
509 if e.cfg.GRPCKeepAliveInterval > time.Duration(0) &&
510 e.cfg.GRPCKeepAliveTimeout > time.Duration(0) {
511 gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{
512 Time: e.cfg.GRPCKeepAliveInterval,
513 Timeout: e.cfg.GRPCKeepAliveTimeout,
514 }))
515 }
516
khenaidood948f772021-08-11 17:49:24 -0400517 // start client servers in a goroutine
khenaidooab1f7bd2019-11-14 14:00:27 -0500518 for _, sctx := range e.sctxs {
519 go func(s *serveCtx) {
520 e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...))
521 }(sctx)
522 }
523 return nil
524}
525
526func (e *Etcd) serveMetrics() (err error) {
527 if e.cfg.Metrics == "extensive" {
528 grpc_prometheus.EnableHandlingTimeHistogram()
529 }
530
531 if len(e.cfg.ListenMetricsUrls) > 0 {
532 metricsMux := http.NewServeMux()
533 etcdhttp.HandleMetricsHealth(metricsMux, e.Server)
534
535 for _, murl := range e.cfg.ListenMetricsUrls {
536 tlsInfo := &e.cfg.ClientTLSInfo
537 if murl.Scheme == "http" {
538 tlsInfo = nil
539 }
540 ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsInfo)
541 if err != nil {
542 return err
543 }
544 e.metricsListeners = append(e.metricsListeners, ml)
545 go func(u url.URL, ln net.Listener) {
khenaidood948f772021-08-11 17:49:24 -0400546 plog.Info("listening for metrics on ", u.String())
khenaidooab1f7bd2019-11-14 14:00:27 -0500547 e.errHandler(http.Serve(ln, metricsMux))
548 }(murl, ml)
549 }
550 }
551 return nil
552}
553
554func (e *Etcd) errHandler(err error) {
555 select {
556 case <-e.stopc:
557 return
558 default:
559 }
560 select {
561 case <-e.stopc:
562 case e.errc <- err:
563 }
564}
565
khenaidooab1f7bd2019-11-14 14:00:27 -0500566func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
567 h, err := strconv.Atoi(retention)
khenaidood948f772021-08-11 17:49:24 -0400568 if err == nil && h >= 0 {
khenaidooab1f7bd2019-11-14 14:00:27 -0500569 switch mode {
khenaidood948f772021-08-11 17:49:24 -0400570 case compactor.ModeRevision:
khenaidooab1f7bd2019-11-14 14:00:27 -0500571 ret = time.Duration(int64(h))
khenaidood948f772021-08-11 17:49:24 -0400572 case compactor.ModePeriodic:
khenaidooab1f7bd2019-11-14 14:00:27 -0500573 ret = time.Duration(int64(h)) * time.Hour
574 }
575 } else {
576 // periodic compaction
577 ret, err = time.ParseDuration(retention)
578 if err != nil {
579 return 0, fmt.Errorf("error parsing CompactionRetention: %v", err)
580 }
581 }
582 return ret, nil
583}