blob: 6a8e39febd03507c1a41ef5f45be3aeb4a8383f5 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdmain
16
17import (
18 "context"
19 "fmt"
20 "io/ioutil"
21 "math"
22 "net"
23 "net/http"
24 "net/url"
25 "os"
26 "path/filepath"
27 "time"
28
29 "github.com/coreos/etcd/clientv3"
30 "github.com/coreos/etcd/clientv3/leasing"
31 "github.com/coreos/etcd/clientv3/namespace"
32 "github.com/coreos/etcd/clientv3/ordering"
33 "github.com/coreos/etcd/etcdserver/api/etcdhttp"
34 "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
35 "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
36 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
37 "github.com/coreos/etcd/pkg/debugutil"
38 "github.com/coreos/etcd/pkg/transport"
39 "github.com/coreos/etcd/proxy/grpcproxy"
40
41 "github.com/coreos/pkg/capnslog"
42 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
43 "github.com/soheilhy/cmux"
44 "github.com/spf13/cobra"
45 "google.golang.org/grpc"
46 "google.golang.org/grpc/grpclog"
47)
48
49var (
50 grpcProxyListenAddr string
51 grpcProxyMetricsListenAddr string
52 grpcProxyEndpoints []string
53 grpcProxyDNSCluster string
54 grpcProxyInsecureDiscovery bool
55 grpcProxyDataDir string
56 grpcMaxCallSendMsgSize int
57 grpcMaxCallRecvMsgSize int
58
59 // tls for connecting to etcd
60
61 grpcProxyCA string
62 grpcProxyCert string
63 grpcProxyKey string
64 grpcProxyInsecureSkipTLSVerify bool
65
66 // tls for clients connecting to proxy
67
68 grpcProxyListenCA string
69 grpcProxyListenCert string
70 grpcProxyListenKey string
71 grpcProxyListenAutoTLS bool
72 grpcProxyListenCRL string
73
74 grpcProxyAdvertiseClientURL string
75 grpcProxyResolverPrefix string
76 grpcProxyResolverTTL int
77
78 grpcProxyNamespace string
79 grpcProxyLeasing string
80
81 grpcProxyEnablePprof bool
82 grpcProxyEnableOrdering bool
83
84 grpcProxyDebug bool
85)
86
87const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
88
89func init() {
90 rootCmd.AddCommand(newGRPCProxyCommand())
91}
92
93// newGRPCProxyCommand returns the cobra command for "grpc-proxy".
94func newGRPCProxyCommand() *cobra.Command {
95 lpc := &cobra.Command{
96 Use: "grpc-proxy <subcommand>",
97 Short: "grpc-proxy related command",
98 }
99 lpc.AddCommand(newGRPCProxyStartCommand())
100
101 return lpc
102}
103
104func newGRPCProxyStartCommand() *cobra.Command {
105 cmd := cobra.Command{
106 Use: "start",
107 Short: "start the grpc proxy",
108 Run: startGRPCProxy,
109 }
110
111 cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
112 cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
113 cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface")
114 cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
115 cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
116 cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
117 cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
118 cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
119 cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
120 cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)
121 cmd.Flags().StringVar(&grpcProxyDataDir, "data-dir", "default.proxy", "Data directory for persistent data")
122 cmd.Flags().IntVar(&grpcMaxCallSendMsgSize, "max-send-bytes", defaultGRPCMaxCallSendMsgSize, "message send limits in bytes (default value is 1.5 MiB)")
123 cmd.Flags().IntVar(&grpcMaxCallRecvMsgSize, "max-recv-bytes", math.MaxInt32, "message receive limits in bytes (default value is math.MaxInt32)")
124
125 // client TLS for connecting to server
126 cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
127 cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
128 cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
129 cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates")
130
131 // client TLS for connecting to proxy
132 cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
133 cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
134 cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")
135 cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates")
136 cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.")
137
138 // experimental flags
139 cmd.Flags().BoolVar(&grpcProxyEnableOrdering, "experimental-serializable-ordering", false, "Ensure serializable reads have monotonically increasing store revisions across endpoints.")
140 cmd.Flags().StringVar(&grpcProxyLeasing, "experimental-leasing-prefix", "", "leasing metadata prefix for disconnected linearized reads.")
141
142 cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")
143
144 return &cmd
145}
146
147func startGRPCProxy(cmd *cobra.Command, args []string) {
148 checkArgs()
149
150 capnslog.SetGlobalLogLevel(capnslog.INFO)
151 if grpcProxyDebug {
152 capnslog.SetGlobalLogLevel(capnslog.DEBUG)
153 grpc.EnableTracing = true
154 // enable info, warning, error
155 grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr))
156 } else {
157 // only discard info
158 grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
159 }
160
161 tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
162 if tlsinfo == nil && grpcProxyListenAutoTLS {
163 host := []string{"https://" + grpcProxyListenAddr}
164 dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
165 autoTLS, err := transport.SelfCert(dir, host)
166 if err != nil {
167 plog.Fatal(err)
168 }
169 tlsinfo = &autoTLS
170 }
171 if tlsinfo != nil {
172 plog.Infof("ServerTLS: %s", tlsinfo)
173 }
174 m := mustListenCMux(tlsinfo)
175
176 grpcl := m.Match(cmux.HTTP2())
177 defer func() {
178 grpcl.Close()
179 plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
180 }()
181
182 client := mustNewClient()
183
184 srvhttp, httpl := mustHTTPListener(m, tlsinfo, client)
185 errc := make(chan error)
186 go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()
187 go func() { errc <- srvhttp.Serve(httpl) }()
188 go func() { errc <- m.Serve() }()
189 if len(grpcProxyMetricsListenAddr) > 0 {
190 mhttpl := mustMetricsListener(tlsinfo)
191 go func() {
192 mux := http.NewServeMux()
193 etcdhttp.HandlePrometheus(mux)
194 grpcproxy.HandleHealth(mux, client)
195 plog.Fatal(http.Serve(mhttpl, mux))
196 }()
197 }
198
199 // grpc-proxy is initialized, ready to serve
200 notifySystemd()
201
202 fmt.Fprintln(os.Stderr, <-errc)
203 os.Exit(1)
204}
205
206func checkArgs() {
207 if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
208 fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
209 os.Exit(1)
210 }
211 if grpcProxyResolverPrefix == "" && grpcProxyResolverTTL > 0 {
212 fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-prefix %q", grpcProxyResolverPrefix))
213 os.Exit(1)
214 }
215 if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL > 0 && grpcProxyAdvertiseClientURL == "" {
216 fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
217 os.Exit(1)
218 }
219}
220
221func mustNewClient() *clientv3.Client {
222 srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
223 eps := srvs.Endpoints
224 if len(eps) == 0 {
225 eps = grpcProxyEndpoints
226 }
227 cfg, err := newClientCfg(eps)
228 if err != nil {
229 fmt.Fprintln(os.Stderr, err)
230 os.Exit(1)
231 }
232 cfg.DialOptions = append(cfg.DialOptions,
233 grpc.WithUnaryInterceptor(grpcproxy.AuthUnaryClientInterceptor))
234 cfg.DialOptions = append(cfg.DialOptions,
235 grpc.WithStreamInterceptor(grpcproxy.AuthStreamClientInterceptor))
236 client, err := clientv3.New(*cfg)
237 if err != nil {
238 fmt.Fprintln(os.Stderr, err)
239 os.Exit(1)
240 }
241 return client
242}
243
244func newClientCfg(eps []string) (*clientv3.Config, error) {
245 // set tls if any one tls option set
246 cfg := clientv3.Config{
247 Endpoints: eps,
248 DialTimeout: 5 * time.Second,
249 }
250
251 if grpcMaxCallSendMsgSize > 0 {
252 cfg.MaxCallSendMsgSize = grpcMaxCallSendMsgSize
253 }
254 if grpcMaxCallRecvMsgSize > 0 {
255 cfg.MaxCallRecvMsgSize = grpcMaxCallRecvMsgSize
256 }
257
258 tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey)
259 if tls == nil && grpcProxyInsecureSkipTLSVerify {
260 tls = &transport.TLSInfo{}
261 }
262 if tls != nil {
263 clientTLS, err := tls.ClientConfig()
264 if err != nil {
265 return nil, err
266 }
267 clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify
268 cfg.TLS = clientTLS
269 plog.Infof("ClientTLS: %s", tls)
270 }
271 return &cfg, nil
272}
273
274func newTLS(ca, cert, key string) *transport.TLSInfo {
275 if ca == "" && cert == "" && key == "" {
276 return nil
277 }
278 return &transport.TLSInfo{CAFile: ca, CertFile: cert, KeyFile: key}
279}
280
281func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux {
282 l, err := net.Listen("tcp", grpcProxyListenAddr)
283 if err != nil {
284 fmt.Fprintln(os.Stderr, err)
285 os.Exit(1)
286 }
287
288 if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
289 fmt.Fprintln(os.Stderr, err)
290 os.Exit(1)
291 }
292 if tlsinfo != nil {
293 tlsinfo.CRLFile = grpcProxyListenCRL
294 if l, err = transport.NewTLSListener(l, tlsinfo); err != nil {
295 plog.Fatal(err)
296 }
297 }
298
299 plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
300 return cmux.New(l)
301}
302
303func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
304 if grpcProxyEnableOrdering {
305 vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
306 client.KV = ordering.NewKV(client.KV, vf)
307 plog.Infof("waiting for linearized read from cluster to recover ordering")
308 for {
309 _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
310 if err == nil {
311 break
312 }
313 plog.Warningf("ordering recovery failed, retrying in 1s (%v)", err)
314 time.Sleep(time.Second)
315 }
316 }
317
318 if len(grpcProxyNamespace) > 0 {
319 client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
320 client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
321 client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
322 }
323
324 if len(grpcProxyLeasing) > 0 {
325 client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
326 }
327
328 kvp, _ := grpcproxy.NewKvProxy(client)
329 watchp, _ := grpcproxy.NewWatchProxy(client)
330 if grpcProxyResolverPrefix != "" {
331 grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
332 }
333 clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
334 leasep, _ := grpcproxy.NewLeaseProxy(client)
335 mainp := grpcproxy.NewMaintenanceProxy(client)
336 authp := grpcproxy.NewAuthProxy(client)
337 electionp := grpcproxy.NewElectionProxy(client)
338 lockp := grpcproxy.NewLockProxy(client)
339
340 server := grpc.NewServer(
341 grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
342 grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
343 grpc.MaxConcurrentStreams(math.MaxUint32),
344 )
345
346 pb.RegisterKVServer(server, kvp)
347 pb.RegisterWatchServer(server, watchp)
348 pb.RegisterClusterServer(server, clusterp)
349 pb.RegisterLeaseServer(server, leasep)
350 pb.RegisterMaintenanceServer(server, mainp)
351 pb.RegisterAuthServer(server, authp)
352 v3electionpb.RegisterElectionServer(server, electionp)
353 v3lockpb.RegisterLockServer(server, lockp)
354
355 // set zero values for metrics registered for this grpc server
356 grpc_prometheus.Register(server)
357
358 return server
359}
360
361func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client) (*http.Server, net.Listener) {
362 httpmux := http.NewServeMux()
363 httpmux.HandleFunc("/", http.NotFound)
364 etcdhttp.HandlePrometheus(httpmux)
365 grpcproxy.HandleHealth(httpmux, c)
366 if grpcProxyEnablePprof {
367 for p, h := range debugutil.PProfHandlers() {
368 httpmux.Handle(p, h)
369 }
370 plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
371 }
372 srvhttp := &http.Server{Handler: httpmux}
373
374 if tlsinfo == nil {
375 return srvhttp, m.Match(cmux.HTTP1())
376 }
377
378 srvTLS, err := tlsinfo.ServerConfig()
379 if err != nil {
380 plog.Fatalf("could not setup TLS (%v)", err)
381 }
382 srvhttp.TLSConfig = srvTLS
383 return srvhttp, m.Match(cmux.Any())
384}
385
386func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener {
387 murl, err := url.Parse(grpcProxyMetricsListenAddr)
388 if err != nil {
389 fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
390 os.Exit(1)
391 }
392 ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
393 if err != nil {
394 fmt.Fprintln(os.Stderr, err)
395 os.Exit(1)
396 }
397 plog.Info("grpc-proxy: listening for metrics on ", murl.String())
398 return ml
399}