blob: 2ecff3aaea94c66890c860e516018c81e7e2ce01 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package httpproxy
16
17import (
18 "bytes"
19 "context"
20 "fmt"
21 "io"
22 "io/ioutil"
23 "net"
24 "net/http"
25 "net/url"
26 "strings"
27 "sync/atomic"
28 "time"
29
30 "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
31 "github.com/coreos/pkg/capnslog"
32)
33
34var (
35 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "proxy/httpproxy")
36
37 // Hop-by-hop headers. These are removed when sent to the backend.
38 // http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
39 // This list of headers borrowed from stdlib httputil.ReverseProxy
40 singleHopHeaders = []string{
41 "Connection",
42 "Keep-Alive",
43 "Proxy-Authenticate",
44 "Proxy-Authorization",
45 "Te", // canonicalized version of "TE"
46 "Trailers",
47 "Transfer-Encoding",
48 "Upgrade",
49 }
50)
51
52func removeSingleHopHeaders(hdrs *http.Header) {
53 for _, h := range singleHopHeaders {
54 hdrs.Del(h)
55 }
56}
57
58type reverseProxy struct {
59 director *director
60 transport http.RoundTripper
61}
62
63func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) {
64 reportIncomingRequest(clientreq)
65 proxyreq := new(http.Request)
66 *proxyreq = *clientreq
67 startTime := time.Now()
68
69 var (
70 proxybody []byte
71 err error
72 )
73
74 if clientreq.Body != nil {
75 proxybody, err = ioutil.ReadAll(clientreq.Body)
76 if err != nil {
77 msg := fmt.Sprintf("failed to read request body: %v", err)
78 plog.Println(msg)
79 e := httptypes.NewHTTPError(http.StatusInternalServerError, "httpproxy: "+msg)
80 if we := e.WriteTo(rw); we != nil {
81 plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
82 }
83 return
84 }
85 }
86
87 // deep-copy the headers, as these will be modified below
88 proxyreq.Header = make(http.Header)
89 copyHeader(proxyreq.Header, clientreq.Header)
90
91 normalizeRequest(proxyreq)
92 removeSingleHopHeaders(&proxyreq.Header)
93 maybeSetForwardedFor(proxyreq)
94
95 endpoints := p.director.endpoints()
96 if len(endpoints) == 0 {
97 msg := "zero endpoints currently available"
98 reportRequestDropped(clientreq, zeroEndpoints)
99
100 // TODO: limit the rate of the error logging.
101 plog.Println(msg)
102 e := httptypes.NewHTTPError(http.StatusServiceUnavailable, "httpproxy: "+msg)
103 if we := e.WriteTo(rw); we != nil {
104 plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
105 }
106 return
107 }
108
109 var requestClosed int32
110 completeCh := make(chan bool, 1)
111 closeNotifier, ok := rw.(http.CloseNotifier)
112 ctx, cancel := context.WithCancel(context.Background())
113 proxyreq = proxyreq.WithContext(ctx)
114 defer cancel()
115 if ok {
116 closeCh := closeNotifier.CloseNotify()
117 go func() {
118 select {
119 case <-closeCh:
120 atomic.StoreInt32(&requestClosed, 1)
121 plog.Printf("client %v closed request prematurely", clientreq.RemoteAddr)
122 cancel()
123 case <-completeCh:
124 }
125 }()
126
127 defer func() {
128 completeCh <- true
129 }()
130 }
131
132 var res *http.Response
133
134 for _, ep := range endpoints {
135 if proxybody != nil {
136 proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
137 }
138 redirectRequest(proxyreq, ep.URL)
139
140 res, err = p.transport.RoundTrip(proxyreq)
141 if atomic.LoadInt32(&requestClosed) == 1 {
142 return
143 }
144 if err != nil {
145 reportRequestDropped(clientreq, failedSendingRequest)
146 plog.Printf("failed to direct request to %s: %v", ep.URL.String(), err)
147 ep.Failed()
148 continue
149 }
150
151 break
152 }
153
154 if res == nil {
155 // TODO: limit the rate of the error logging.
156 msg := fmt.Sprintf("unable to get response from %d endpoint(s)", len(endpoints))
157 reportRequestDropped(clientreq, failedGettingResponse)
158 plog.Println(msg)
159 e := httptypes.NewHTTPError(http.StatusBadGateway, "httpproxy: "+msg)
160 if we := e.WriteTo(rw); we != nil {
161 plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
162 }
163 return
164 }
165
166 defer res.Body.Close()
167 reportRequestHandled(clientreq, res, startTime)
168 removeSingleHopHeaders(&res.Header)
169 copyHeader(rw.Header(), res.Header)
170
171 rw.WriteHeader(res.StatusCode)
172 io.Copy(rw, res.Body)
173}
174
175func copyHeader(dst, src http.Header) {
176 for k, vv := range src {
177 for _, v := range vv {
178 dst.Add(k, v)
179 }
180 }
181}
182
183func redirectRequest(req *http.Request, loc url.URL) {
184 req.URL.Scheme = loc.Scheme
185 req.URL.Host = loc.Host
186}
187
188func normalizeRequest(req *http.Request) {
189 req.Proto = "HTTP/1.1"
190 req.ProtoMajor = 1
191 req.ProtoMinor = 1
192 req.Close = false
193}
194
195func maybeSetForwardedFor(req *http.Request) {
196 clientIP, _, err := net.SplitHostPort(req.RemoteAddr)
197 if err != nil {
198 return
199 }
200
201 // If we aren't the first proxy retain prior
202 // X-Forwarded-For information as a comma+space
203 // separated list and fold multiple headers into one.
204 if prior, ok := req.Header["X-Forwarded-For"]; ok {
205 clientIP = strings.Join(prior, ", ") + ", " + clientIP
206 }
207 req.Header.Set("X-Forwarded-For", clientIP)
208}