blob: a3b20c46c38f1471558c15c1c3d1e8ac50612bdf [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package embed
16
17import (
18 "context"
19 "fmt"
20 "io/ioutil"
21 defaultLog "log"
22 "net"
23 "net/http"
24 "strings"
25
26 "go.etcd.io/etcd/clientv3/credentials"
27 "go.etcd.io/etcd/etcdserver"
28 "go.etcd.io/etcd/etcdserver/api/v3client"
29 "go.etcd.io/etcd/etcdserver/api/v3election"
30 "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
31 v3electiongw "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb/gw"
32 "go.etcd.io/etcd/etcdserver/api/v3lock"
33 "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
34 v3lockgw "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb/gw"
35 "go.etcd.io/etcd/etcdserver/api/v3rpc"
36 etcdservergw "go.etcd.io/etcd/etcdserver/etcdserverpb/gw"
37 "go.etcd.io/etcd/pkg/debugutil"
38 "go.etcd.io/etcd/pkg/httputil"
39 "go.etcd.io/etcd/pkg/transport"
40
41 gw "github.com/grpc-ecosystem/grpc-gateway/runtime"
42 "github.com/soheilhy/cmux"
43 "github.com/tmc/grpc-websocket-proxy/wsproxy"
44 "go.uber.org/zap"
45 "golang.org/x/net/trace"
46 "google.golang.org/grpc"
47)
48
49type serveCtx struct {
50 lg *zap.Logger
51 l net.Listener
52 addr string
53 network string
54 secure bool
55 insecure bool
56
57 ctx context.Context
58 cancel context.CancelFunc
59
60 userHandlers map[string]http.Handler
61 serviceRegister func(*grpc.Server)
62 serversC chan *servers
63}
64
65type servers struct {
66 secure bool
67 grpc *grpc.Server
68 http *http.Server
69}
70
71func newServeCtx(lg *zap.Logger) *serveCtx {
72 ctx, cancel := context.WithCancel(context.Background())
73 return &serveCtx{
74 lg: lg,
75 ctx: ctx,
76 cancel: cancel,
77 userHandlers: make(map[string]http.Handler),
78 serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true
79 }
80}
81
82// serve accepts incoming connections on the listener l,
83// creating a new service goroutine for each. The service goroutines
84// read requests and then call handler to reply to them.
85func (sctx *serveCtx) serve(
86 s *etcdserver.EtcdServer,
87 tlsinfo *transport.TLSInfo,
88 handler http.Handler,
89 errHandler func(error),
90 gopts ...grpc.ServerOption) (err error) {
91 logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
92 <-s.ReadyNotify()
93
94 if sctx.lg == nil {
95 plog.Info("ready to serve client requests")
96 }
97
98 m := cmux.New(sctx.l)
99 v3c := v3client.New(s)
100 servElection := v3election.NewElectionServer(v3c)
101 servLock := v3lock.NewLockServer(v3c)
102
103 var gs *grpc.Server
104 defer func() {
105 if err != nil && gs != nil {
106 gs.Stop()
107 }
108 }()
109
110 if sctx.insecure {
111 gs = v3rpc.Server(s, nil, gopts...)
112 v3electionpb.RegisterElectionServer(gs, servElection)
113 v3lockpb.RegisterLockServer(gs, servLock)
114 if sctx.serviceRegister != nil {
115 sctx.serviceRegister(gs)
116 }
117 grpcl := m.Match(cmux.HTTP2())
118 go func() { errHandler(gs.Serve(grpcl)) }()
119
120 var gwmux *gw.ServeMux
121 if s.Cfg.EnableGRPCGateway {
122 gwmux, err = sctx.registerGateway([]grpc.DialOption{grpc.WithInsecure()})
123 if err != nil {
124 return err
125 }
126 }
127
128 httpmux := sctx.createMux(gwmux, handler)
129
130 srvhttp := &http.Server{
131 Handler: createAccessController(sctx.lg, s, httpmux),
132 ErrorLog: logger, // do not log user error
133 }
134 httpl := m.Match(cmux.HTTP1())
135 go func() { errHandler(srvhttp.Serve(httpl)) }()
136
137 sctx.serversC <- &servers{grpc: gs, http: srvhttp}
138 if sctx.lg != nil {
139 sctx.lg.Info(
140 "serving client traffic insecurely; this is strongly discouraged!",
141 zap.String("address", sctx.l.Addr().String()),
142 )
143 } else {
144 plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
145 }
146 }
147
148 if sctx.secure {
149 tlscfg, tlsErr := tlsinfo.ServerConfig()
150 if tlsErr != nil {
151 return tlsErr
152 }
153 gs = v3rpc.Server(s, tlscfg, gopts...)
154 v3electionpb.RegisterElectionServer(gs, servElection)
155 v3lockpb.RegisterLockServer(gs, servLock)
156 if sctx.serviceRegister != nil {
157 sctx.serviceRegister(gs)
158 }
159 handler = grpcHandlerFunc(gs, handler)
160
161 var gwmux *gw.ServeMux
162 if s.Cfg.EnableGRPCGateway {
163 dtls := tlscfg.Clone()
164 // trust local server
165 dtls.InsecureSkipVerify = true
166 bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
167 opts := []grpc.DialOption{grpc.WithTransportCredentials(bundle.TransportCredentials())}
168 gwmux, err = sctx.registerGateway(opts)
169 if err != nil {
170 return err
171 }
172 }
173
174 var tlsl net.Listener
175 tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
176 if err != nil {
177 return err
178 }
179 // TODO: add debug flag; enable logging when debug flag is set
180 httpmux := sctx.createMux(gwmux, handler)
181
182 srv := &http.Server{
183 Handler: createAccessController(sctx.lg, s, httpmux),
184 TLSConfig: tlscfg,
185 ErrorLog: logger, // do not log user error
186 }
187 go func() { errHandler(srv.Serve(tlsl)) }()
188
189 sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
190 if sctx.lg != nil {
191 sctx.lg.Info(
192 "serving client traffic securely",
193 zap.String("address", sctx.l.Addr().String()),
194 )
195 } else {
196 plog.Infof("serving client requests on %s", sctx.l.Addr().String())
197 }
198 }
199
200 close(sctx.serversC)
201 return m.Serve()
202}
203
204// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
205// connections or otherHandler otherwise. Given in gRPC docs.
206func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
207 if otherHandler == nil {
208 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
209 grpcServer.ServeHTTP(w, r)
210 })
211 }
212 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
213 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
214 grpcServer.ServeHTTP(w, r)
215 } else {
216 otherHandler.ServeHTTP(w, r)
217 }
218 })
219}
220
221type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error
222
223func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) {
224 ctx := sctx.ctx
225
226 addr := sctx.addr
227 if network := sctx.network; network == "unix" {
228 // explicitly define unix network for gRPC socket support
229 addr = fmt.Sprintf("%s://%s", network, addr)
230 }
231
232 conn, err := grpc.DialContext(ctx, addr, opts...)
233 if err != nil {
234 return nil, err
235 }
236 gwmux := gw.NewServeMux()
237
238 handlers := []registerHandlerFunc{
239 etcdservergw.RegisterKVHandler,
240 etcdservergw.RegisterWatchHandler,
241 etcdservergw.RegisterLeaseHandler,
242 etcdservergw.RegisterClusterHandler,
243 etcdservergw.RegisterMaintenanceHandler,
244 etcdservergw.RegisterAuthHandler,
245 v3lockgw.RegisterLockHandler,
246 v3electiongw.RegisterElectionHandler,
247 }
248 for _, h := range handlers {
249 if err := h(ctx, gwmux, conn); err != nil {
250 return nil, err
251 }
252 }
253 go func() {
254 <-ctx.Done()
255 if cerr := conn.Close(); cerr != nil {
256 if sctx.lg != nil {
257 sctx.lg.Warn(
258 "failed to close connection",
259 zap.String("address", sctx.l.Addr().String()),
260 zap.Error(cerr),
261 )
262 } else {
263 plog.Warningf("failed to close conn to %s: %v", sctx.l.Addr().String(), cerr)
264 }
265 }
266 }()
267
268 return gwmux, nil
269}
270
271func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.ServeMux {
272 httpmux := http.NewServeMux()
273 for path, h := range sctx.userHandlers {
274 httpmux.Handle(path, h)
275 }
276
277 if gwmux != nil {
278 httpmux.Handle(
279 "/v3/",
280 wsproxy.WebsocketProxy(
281 gwmux,
282 wsproxy.WithRequestMutator(
283 // Default to the POST method for streams
284 func(_ *http.Request, outgoing *http.Request) *http.Request {
285 outgoing.Method = "POST"
286 return outgoing
287 },
288 ),
289 ),
290 )
291 }
292 if handler != nil {
293 httpmux.Handle("/", handler)
294 }
295 return httpmux
296}
297
298// createAccessController wraps HTTP multiplexer:
299// - mutate gRPC gateway request paths
300// - check hostname whitelist
301// client HTTP requests goes here first
302func createAccessController(lg *zap.Logger, s *etcdserver.EtcdServer, mux *http.ServeMux) http.Handler {
303 return &accessController{lg: lg, s: s, mux: mux}
304}
305
306type accessController struct {
307 lg *zap.Logger
308 s *etcdserver.EtcdServer
309 mux *http.ServeMux
310}
311
312func (ac *accessController) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
313 // redirect for backward compatibilities
314 if req != nil && req.URL != nil && strings.HasPrefix(req.URL.Path, "/v3beta/") {
315 req.URL.Path = strings.Replace(req.URL.Path, "/v3beta/", "/v3/", 1)
316 }
317
318 if req.TLS == nil { // check origin if client connection is not secure
319 host := httputil.GetHostname(req)
320 if !ac.s.AccessController.IsHostWhitelisted(host) {
321 if ac.lg != nil {
322 ac.lg.Warn(
323 "rejecting HTTP request to prevent DNS rebinding attacks",
324 zap.String("host", host),
325 )
326 } else {
327 plog.Warningf("rejecting HTTP request from %q to prevent DNS rebinding attacks", host)
328 }
329 // TODO: use Go's "http.StatusMisdirectedRequest" (421)
330 // https://github.com/golang/go/commit/4b8a7eafef039af1834ef9bfa879257c4a72b7b5
331 http.Error(rw, errCVE20185702(host), 421)
332 return
333 }
334 } else if ac.s.Cfg.ClientCertAuthEnabled && ac.s.Cfg.EnableGRPCGateway &&
335 ac.s.AuthStore().IsAuthEnabled() && strings.HasPrefix(req.URL.Path, "/v3/") {
336 for _, chains := range req.TLS.VerifiedChains {
337 if len(chains) < 1 {
338 continue
339 }
340 if len(chains[0].Subject.CommonName) != 0 {
341 http.Error(rw, "CommonName of client sending a request against gateway will be ignored and not used as expected", 400)
342 return
343 }
344 }
345 }
346
347 // Write CORS header.
348 if ac.s.AccessController.OriginAllowed("*") {
349 addCORSHeader(rw, "*")
350 } else if origin := req.Header.Get("Origin"); ac.s.OriginAllowed(origin) {
351 addCORSHeader(rw, origin)
352 }
353
354 if req.Method == "OPTIONS" {
355 rw.WriteHeader(http.StatusOK)
356 return
357 }
358
359 ac.mux.ServeHTTP(rw, req)
360}
361
362// addCORSHeader adds the correct cors headers given an origin
363func addCORSHeader(w http.ResponseWriter, origin string) {
364 w.Header().Add("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
365 w.Header().Add("Access-Control-Allow-Origin", origin)
366 w.Header().Add("Access-Control-Allow-Headers", "accept, content-type, authorization")
367}
368
369// https://github.com/transmission/transmission/pull/468
370func errCVE20185702(host string) string {
371 return fmt.Sprintf(`
372etcd received your request, but the Host header was unrecognized.
373
374To fix this, choose one of the following options:
375- Enable TLS, then any HTTPS request will be allowed.
376- Add the hostname you want to use to the whitelist in settings.
377 - e.g. etcd --host-whitelist %q
378
379This requirement has been added to help prevent "DNS Rebinding" attacks (CVE-2018-5702).
380`, host)
381}
382
383// WrapCORS wraps existing handler with CORS.
384// TODO: deprecate this after v2 proxy deprecate
385func WrapCORS(cors map[string]struct{}, h http.Handler) http.Handler {
386 return &corsHandler{
387 ac: &etcdserver.AccessController{CORS: cors},
388 h: h,
389 }
390}
391
392type corsHandler struct {
393 ac *etcdserver.AccessController
394 h http.Handler
395}
396
397func (ch *corsHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
398 if ch.ac.OriginAllowed("*") {
399 addCORSHeader(rw, "*")
400 } else if origin := req.Header.Get("Origin"); ch.ac.OriginAllowed(origin) {
401 addCORSHeader(rw, origin)
402 }
403
404 if req.Method == "OPTIONS" {
405 rw.WriteHeader(http.StatusOK)
406 return
407 }
408
409 ch.h.ServeHTTP(rw, req)
410}
411
412func (sctx *serveCtx) registerUserHandler(s string, h http.Handler) {
413 if sctx.userHandlers[s] != nil {
414 if sctx.lg != nil {
415 sctx.lg.Warn("path is already registered by user handler", zap.String("path", s))
416 } else {
417 plog.Warningf("path %s already registered by user handler", s)
418 }
419 return
420 }
421 sctx.userHandlers[s] = h
422}
423
424func (sctx *serveCtx) registerPprof() {
425 for p, h := range debugutil.PProfHandlers() {
426 sctx.registerUserHandler(p, h)
427 }
428}
429
430func (sctx *serveCtx) registerTrace() {
431 reqf := func(w http.ResponseWriter, r *http.Request) { trace.Render(w, r, true) }
432 sctx.registerUserHandler("/debug/requests", http.HandlerFunc(reqf))
433 evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
434 sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
435}