blob: 62b8b57805b66ddee41135d7417778b0dca726d7 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package embed
16
17import (
18 "context"
khenaidooab1f7bd2019-11-14 14:00:27 -050019 "io/ioutil"
20 defaultLog "log"
21 "net"
22 "net/http"
23 "strings"
24
khenaidood948f772021-08-11 17:49:24 -040025 "github.com/coreos/etcd/etcdserver"
26 "github.com/coreos/etcd/etcdserver/api/v3client"
27 "github.com/coreos/etcd/etcdserver/api/v3election"
28 "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb"
29 v3electiongw "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb/gw"
30 "github.com/coreos/etcd/etcdserver/api/v3lock"
31 "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
32 v3lockgw "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb/gw"
33 "github.com/coreos/etcd/etcdserver/api/v3rpc"
34 etcdservergw "github.com/coreos/etcd/etcdserver/etcdserverpb/gw"
35 "github.com/coreos/etcd/pkg/debugutil"
36 "github.com/coreos/etcd/pkg/transport"
khenaidooab1f7bd2019-11-14 14:00:27 -050037
38 gw "github.com/grpc-ecosystem/grpc-gateway/runtime"
39 "github.com/soheilhy/cmux"
40 "github.com/tmc/grpc-websocket-proxy/wsproxy"
khenaidooab1f7bd2019-11-14 14:00:27 -050041 "golang.org/x/net/trace"
42 "google.golang.org/grpc"
khenaidood948f772021-08-11 17:49:24 -040043 "google.golang.org/grpc/credentials"
khenaidooab1f7bd2019-11-14 14:00:27 -050044)
45
46type serveCtx struct {
khenaidooab1f7bd2019-11-14 14:00:27 -050047 l net.Listener
48 addr string
khenaidooab1f7bd2019-11-14 14:00:27 -050049 secure bool
50 insecure bool
51
52 ctx context.Context
53 cancel context.CancelFunc
54
55 userHandlers map[string]http.Handler
56 serviceRegister func(*grpc.Server)
57 serversC chan *servers
58}
59
60type servers struct {
61 secure bool
62 grpc *grpc.Server
63 http *http.Server
64}
65
khenaidood948f772021-08-11 17:49:24 -040066func newServeCtx() *serveCtx {
khenaidooab1f7bd2019-11-14 14:00:27 -050067 ctx, cancel := context.WithCancel(context.Background())
khenaidood948f772021-08-11 17:49:24 -040068 return &serveCtx{ctx: ctx, cancel: cancel, userHandlers: make(map[string]http.Handler),
69 serversC: make(chan *servers, 2), // in case sctx.insecure,sctx.secure true
khenaidooab1f7bd2019-11-14 14:00:27 -050070 }
71}
72
73// serve accepts incoming connections on the listener l,
74// creating a new service goroutine for each. The service goroutines
75// read requests and then call handler to reply to them.
76func (sctx *serveCtx) serve(
77 s *etcdserver.EtcdServer,
78 tlsinfo *transport.TLSInfo,
79 handler http.Handler,
80 errHandler func(error),
81 gopts ...grpc.ServerOption) (err error) {
82 logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
83 <-s.ReadyNotify()
khenaidood948f772021-08-11 17:49:24 -040084 plog.Info("ready to serve client requests")
khenaidooab1f7bd2019-11-14 14:00:27 -050085
86 m := cmux.New(sctx.l)
87 v3c := v3client.New(s)
88 servElection := v3election.NewElectionServer(v3c)
89 servLock := v3lock.NewLockServer(v3c)
90
91 var gs *grpc.Server
92 defer func() {
93 if err != nil && gs != nil {
94 gs.Stop()
95 }
96 }()
97
98 if sctx.insecure {
99 gs = v3rpc.Server(s, nil, gopts...)
100 v3electionpb.RegisterElectionServer(gs, servElection)
101 v3lockpb.RegisterLockServer(gs, servLock)
102 if sctx.serviceRegister != nil {
103 sctx.serviceRegister(gs)
104 }
105 grpcl := m.Match(cmux.HTTP2())
106 go func() { errHandler(gs.Serve(grpcl)) }()
107
108 var gwmux *gw.ServeMux
khenaidood948f772021-08-11 17:49:24 -0400109 gwmux, err = sctx.registerGateway([]grpc.DialOption{grpc.WithInsecure()})
110 if err != nil {
111 return err
khenaidooab1f7bd2019-11-14 14:00:27 -0500112 }
113
114 httpmux := sctx.createMux(gwmux, handler)
115
116 srvhttp := &http.Server{
khenaidood948f772021-08-11 17:49:24 -0400117 Handler: wrapMux(httpmux),
khenaidooab1f7bd2019-11-14 14:00:27 -0500118 ErrorLog: logger, // do not log user error
119 }
120 httpl := m.Match(cmux.HTTP1())
121 go func() { errHandler(srvhttp.Serve(httpl)) }()
122
123 sctx.serversC <- &servers{grpc: gs, http: srvhttp}
khenaidood948f772021-08-11 17:49:24 -0400124 plog.Noticef("serving insecure client requests on %s, this is strongly discouraged!", sctx.l.Addr().String())
khenaidooab1f7bd2019-11-14 14:00:27 -0500125 }
126
127 if sctx.secure {
128 tlscfg, tlsErr := tlsinfo.ServerConfig()
129 if tlsErr != nil {
130 return tlsErr
131 }
132 gs = v3rpc.Server(s, tlscfg, gopts...)
133 v3electionpb.RegisterElectionServer(gs, servElection)
134 v3lockpb.RegisterLockServer(gs, servLock)
135 if sctx.serviceRegister != nil {
136 sctx.serviceRegister(gs)
137 }
138 handler = grpcHandlerFunc(gs, handler)
139
khenaidood948f772021-08-11 17:49:24 -0400140 dtls := tlscfg.Clone()
141 // trust local server
142 dtls.InsecureSkipVerify = true
143 creds := credentials.NewTLS(dtls)
144 opts := []grpc.DialOption{grpc.WithTransportCredentials(creds)}
khenaidooab1f7bd2019-11-14 14:00:27 -0500145 var gwmux *gw.ServeMux
khenaidood948f772021-08-11 17:49:24 -0400146 gwmux, err = sctx.registerGateway(opts)
147 if err != nil {
148 return err
khenaidooab1f7bd2019-11-14 14:00:27 -0500149 }
150
151 var tlsl net.Listener
152 tlsl, err = transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
153 if err != nil {
154 return err
155 }
156 // TODO: add debug flag; enable logging when debug flag is set
157 httpmux := sctx.createMux(gwmux, handler)
158
159 srv := &http.Server{
khenaidood948f772021-08-11 17:49:24 -0400160 Handler: wrapMux(httpmux),
khenaidooab1f7bd2019-11-14 14:00:27 -0500161 TLSConfig: tlscfg,
162 ErrorLog: logger, // do not log user error
163 }
164 go func() { errHandler(srv.Serve(tlsl)) }()
165
166 sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
khenaidood948f772021-08-11 17:49:24 -0400167 plog.Infof("serving client requests on %s", sctx.l.Addr().String())
khenaidooab1f7bd2019-11-14 14:00:27 -0500168 }
169
170 close(sctx.serversC)
171 return m.Serve()
172}
173
174// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
175// connections or otherHandler otherwise. Given in gRPC docs.
176func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
177 if otherHandler == nil {
178 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
179 grpcServer.ServeHTTP(w, r)
180 })
181 }
182 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
183 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
184 grpcServer.ServeHTTP(w, r)
185 } else {
186 otherHandler.ServeHTTP(w, r)
187 }
188 })
189}
190
191type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error
192
193func (sctx *serveCtx) registerGateway(opts []grpc.DialOption) (*gw.ServeMux, error) {
194 ctx := sctx.ctx
khenaidood948f772021-08-11 17:49:24 -0400195 conn, err := grpc.DialContext(ctx, sctx.addr, opts...)
khenaidooab1f7bd2019-11-14 14:00:27 -0500196 if err != nil {
197 return nil, err
198 }
199 gwmux := gw.NewServeMux()
200
201 handlers := []registerHandlerFunc{
202 etcdservergw.RegisterKVHandler,
203 etcdservergw.RegisterWatchHandler,
204 etcdservergw.RegisterLeaseHandler,
205 etcdservergw.RegisterClusterHandler,
206 etcdservergw.RegisterMaintenanceHandler,
207 etcdservergw.RegisterAuthHandler,
208 v3lockgw.RegisterLockHandler,
209 v3electiongw.RegisterElectionHandler,
210 }
211 for _, h := range handlers {
212 if err := h(ctx, gwmux, conn); err != nil {
213 return nil, err
214 }
215 }
216 go func() {
217 <-ctx.Done()
218 if cerr := conn.Close(); cerr != nil {
khenaidood948f772021-08-11 17:49:24 -0400219 plog.Warningf("failed to close conn to %s: %v", sctx.l.Addr().String(), cerr)
khenaidooab1f7bd2019-11-14 14:00:27 -0500220 }
221 }()
222
223 return gwmux, nil
224}
225
226func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http.ServeMux {
227 httpmux := http.NewServeMux()
228 for path, h := range sctx.userHandlers {
229 httpmux.Handle(path, h)
230 }
231
khenaidood948f772021-08-11 17:49:24 -0400232 httpmux.Handle(
233 "/v3beta/",
234 wsproxy.WebsocketProxy(
235 gwmux,
236 wsproxy.WithRequestMutator(
237 // Default to the POST method for streams
238 func(incoming *http.Request, outgoing *http.Request) *http.Request {
239 outgoing.Method = "POST"
240 return outgoing
241 },
khenaidooab1f7bd2019-11-14 14:00:27 -0500242 ),
khenaidood948f772021-08-11 17:49:24 -0400243 ),
244 )
khenaidooab1f7bd2019-11-14 14:00:27 -0500245 if handler != nil {
246 httpmux.Handle("/", handler)
247 }
248 return httpmux
249}
250
khenaidood948f772021-08-11 17:49:24 -0400251// wraps HTTP multiplexer to mute requests to /v3alpha
252// TODO: deprecate this in 3.4 release
253func wrapMux(mux *http.ServeMux) http.Handler { return &v3alphaMutator{mux: mux} }
khenaidooab1f7bd2019-11-14 14:00:27 -0500254
khenaidood948f772021-08-11 17:49:24 -0400255type v3alphaMutator struct {
khenaidooab1f7bd2019-11-14 14:00:27 -0500256 mux *http.ServeMux
257}
258
khenaidood948f772021-08-11 17:49:24 -0400259func (m *v3alphaMutator) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
260 if req != nil && req.URL != nil && strings.HasPrefix(req.URL.Path, "/v3alpha/") {
261 req.URL.Path = strings.Replace(req.URL.Path, "/v3alpha/", "/v3beta/", 1)
khenaidooab1f7bd2019-11-14 14:00:27 -0500262 }
khenaidood948f772021-08-11 17:49:24 -0400263 m.mux.ServeHTTP(rw, req)
khenaidooab1f7bd2019-11-14 14:00:27 -0500264}
265
266func (sctx *serveCtx) registerUserHandler(s string, h http.Handler) {
267 if sctx.userHandlers[s] != nil {
khenaidood948f772021-08-11 17:49:24 -0400268 plog.Warningf("path %s already registered by user handler", s)
khenaidooab1f7bd2019-11-14 14:00:27 -0500269 return
270 }
271 sctx.userHandlers[s] = h
272}
273
274func (sctx *serveCtx) registerPprof() {
275 for p, h := range debugutil.PProfHandlers() {
276 sctx.registerUserHandler(p, h)
277 }
278}
279
280func (sctx *serveCtx) registerTrace() {
281 reqf := func(w http.ResponseWriter, r *http.Request) { trace.Render(w, r, true) }
282 sctx.registerUserHandler("/debug/requests", http.HandlerFunc(reqf))
283 evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
284 sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
285}