blob: bbc1cb9c34551aa3e736d5ab95acf4c4af898fc0 [file] [log] [blame]
sslobodrd046be82019-01-16 10:02:22 -05001// Copyright 2011 Google Inc. All rights reserved.
2// Use of this source code is governed by the Apache 2.0
3// license that can be found in the LICENSE file.
4
5// +build !appengine
6
7package internal
8
9import (
10 "bytes"
11 "errors"
12 "fmt"
13 "io/ioutil"
14 "log"
15 "net"
16 "net/http"
17 "net/url"
18 "os"
19 "runtime"
20 "strconv"
21 "strings"
22 "sync"
23 "sync/atomic"
24 "time"
25
26 "github.com/golang/protobuf/proto"
27 netcontext "golang.org/x/net/context"
28
29 basepb "google.golang.org/appengine/internal/base"
30 logpb "google.golang.org/appengine/internal/log"
31 remotepb "google.golang.org/appengine/internal/remote_api"
32)
33
34const (
35 apiPath = "/rpc_http"
36 defaultTicketSuffix = "/default.20150612t184001.0"
37)
38
39var (
40 // Incoming headers.
41 ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
42 dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
43 traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
44 curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
45 userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
46 remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
47
48 // Outgoing headers.
49 apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
50 apiEndpointHeaderValue = []string{"app-engine-apis"}
51 apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
52 apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
53 apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
54 apiContentType = http.CanonicalHeaderKey("Content-Type")
55 apiContentTypeValue = []string{"application/octet-stream"}
56 logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
57
58 apiHTTPClient = &http.Client{
59 Transport: &http.Transport{
60 Proxy: http.ProxyFromEnvironment,
61 Dial: limitDial,
62 },
63 }
64
65 defaultTicketOnce sync.Once
66 defaultTicket string
67 backgroundContextOnce sync.Once
68 backgroundContext netcontext.Context
69)
70
71func apiURL() *url.URL {
72 host, port := "appengine.googleapis.internal", "10001"
73 if h := os.Getenv("API_HOST"); h != "" {
74 host = h
75 }
76 if p := os.Getenv("API_PORT"); p != "" {
77 port = p
78 }
79 return &url.URL{
80 Scheme: "http",
81 Host: host + ":" + port,
82 Path: apiPath,
83 }
84}
85
86func handleHTTP(w http.ResponseWriter, r *http.Request) {
87 c := &context{
88 req: r,
89 outHeader: w.Header(),
90 apiURL: apiURL(),
91 }
92 r = r.WithContext(withContext(r.Context(), c))
93 c.req = r
94
95 stopFlushing := make(chan int)
96
97 // Patch up RemoteAddr so it looks reasonable.
98 if addr := r.Header.Get(userIPHeader); addr != "" {
99 r.RemoteAddr = addr
100 } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
101 r.RemoteAddr = addr
102 } else {
103 // Should not normally reach here, but pick a sensible default anyway.
104 r.RemoteAddr = "127.0.0.1"
105 }
106 // The address in the headers will most likely be of these forms:
107 // 123.123.123.123
108 // 2001:db8::1
109 // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
110 if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
111 // Assume the remote address is only a host; add a default port.
112 r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
113 }
114
115 // Start goroutine responsible for flushing app logs.
116 // This is done after adding c to ctx.m (and stopped before removing it)
117 // because flushing logs requires making an API call.
118 go c.logFlusher(stopFlushing)
119
120 executeRequestSafely(c, r)
121 c.outHeader = nil // make sure header changes aren't respected any more
122
123 stopFlushing <- 1 // any logging beyond this point will be dropped
124
125 // Flush any pending logs asynchronously.
126 c.pendingLogs.Lock()
127 flushes := c.pendingLogs.flushes
128 if len(c.pendingLogs.lines) > 0 {
129 flushes++
130 }
131 c.pendingLogs.Unlock()
132 flushed := make(chan struct{})
133 go func() {
134 defer close(flushed)
135 // Force a log flush, because with very short requests we
136 // may not ever flush logs.
137 c.flushLog(true)
138 }()
139 w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
140
141 // Avoid nil Write call if c.Write is never called.
142 if c.outCode != 0 {
143 w.WriteHeader(c.outCode)
144 }
145 if c.outBody != nil {
146 w.Write(c.outBody)
147 }
148 // Wait for the last flush to complete before returning,
149 // otherwise the security ticket will not be valid.
150 <-flushed
151}
152
153func executeRequestSafely(c *context, r *http.Request) {
154 defer func() {
155 if x := recover(); x != nil {
156 logf(c, 4, "%s", renderPanic(x)) // 4 == critical
157 c.outCode = 500
158 }
159 }()
160
161 http.DefaultServeMux.ServeHTTP(c, r)
162}
163
164func renderPanic(x interface{}) string {
165 buf := make([]byte, 16<<10) // 16 KB should be plenty
166 buf = buf[:runtime.Stack(buf, false)]
167
168 // Remove the first few stack frames:
169 // this func
170 // the recover closure in the caller
171 // That will root the stack trace at the site of the panic.
172 const (
173 skipStart = "internal.renderPanic"
174 skipFrames = 2
175 )
176 start := bytes.Index(buf, []byte(skipStart))
177 p := start
178 for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
179 p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
180 if p < 0 {
181 break
182 }
183 }
184 if p >= 0 {
185 // buf[start:p+1] is the block to remove.
186 // Copy buf[p+1:] over buf[start:] and shrink buf.
187 copy(buf[start:], buf[p+1:])
188 buf = buf[:len(buf)-(p+1-start)]
189 }
190
191 // Add panic heading.
192 head := fmt.Sprintf("panic: %v\n\n", x)
193 if len(head) > len(buf) {
194 // Extremely unlikely to happen.
195 return head
196 }
197 copy(buf[len(head):], buf)
198 copy(buf, head)
199
200 return string(buf)
201}
202
203// context represents the context of an in-flight HTTP request.
204// It implements the appengine.Context and http.ResponseWriter interfaces.
205type context struct {
206 req *http.Request
207
208 outCode int
209 outHeader http.Header
210 outBody []byte
211
212 pendingLogs struct {
213 sync.Mutex
214 lines []*logpb.UserAppLogLine
215 flushes int
216 }
217
218 apiURL *url.URL
219}
220
221var contextKey = "holds a *context"
222
223// jointContext joins two contexts in a superficial way.
224// It takes values and timeouts from a base context, and only values from another context.
225type jointContext struct {
226 base netcontext.Context
227 valuesOnly netcontext.Context
228}
229
230func (c jointContext) Deadline() (time.Time, bool) {
231 return c.base.Deadline()
232}
233
234func (c jointContext) Done() <-chan struct{} {
235 return c.base.Done()
236}
237
238func (c jointContext) Err() error {
239 return c.base.Err()
240}
241
242func (c jointContext) Value(key interface{}) interface{} {
243 if val := c.base.Value(key); val != nil {
244 return val
245 }
246 return c.valuesOnly.Value(key)
247}
248
249// fromContext returns the App Engine context or nil if ctx is not
250// derived from an App Engine context.
251func fromContext(ctx netcontext.Context) *context {
252 c, _ := ctx.Value(&contextKey).(*context)
253 return c
254}
255
256func withContext(parent netcontext.Context, c *context) netcontext.Context {
257 ctx := netcontext.WithValue(parent, &contextKey, c)
258 if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
259 ctx = withNamespace(ctx, ns)
260 }
261 return ctx
262}
263
264func toContext(c *context) netcontext.Context {
265 return withContext(netcontext.Background(), c)
266}
267
268func IncomingHeaders(ctx netcontext.Context) http.Header {
269 if c := fromContext(ctx); c != nil {
270 return c.req.Header
271 }
272 return nil
273}
274
275func ReqContext(req *http.Request) netcontext.Context {
276 return req.Context()
277}
278
279func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
280 return jointContext{
281 base: parent,
282 valuesOnly: req.Context(),
283 }
284}
285
286// DefaultTicket returns a ticket used for background context or dev_appserver.
287func DefaultTicket() string {
288 defaultTicketOnce.Do(func() {
289 if IsDevAppServer() {
290 defaultTicket = "testapp" + defaultTicketSuffix
291 return
292 }
293 appID := partitionlessAppID()
294 escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
295 majVersion := VersionID(nil)
296 if i := strings.Index(majVersion, "."); i > 0 {
297 majVersion = majVersion[:i]
298 }
299 defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
300 })
301 return defaultTicket
302}
303
304func BackgroundContext() netcontext.Context {
305 backgroundContextOnce.Do(func() {
306 // Compute background security ticket.
307 ticket := DefaultTicket()
308
309 c := &context{
310 req: &http.Request{
311 Header: http.Header{
312 ticketHeader: []string{ticket},
313 },
314 },
315 apiURL: apiURL(),
316 }
317 backgroundContext = toContext(c)
318
319 // TODO(dsymonds): Wire up the shutdown handler to do a final flush.
320 go c.logFlusher(make(chan int))
321 })
322
323 return backgroundContext
324}
325
326// RegisterTestRequest registers the HTTP request req for testing, such that
327// any API calls are sent to the provided URL. It returns a closure to delete
328// the registration.
329// It should only be used by aetest package.
330func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
331 c := &context{
332 req: req,
333 apiURL: apiURL,
334 }
335 ctx := withContext(decorate(req.Context()), c)
336 req = req.WithContext(ctx)
337 c.req = req
338 return req, func() {}
339}
340
341var errTimeout = &CallError{
342 Detail: "Deadline exceeded",
343 Code: int32(remotepb.RpcError_CANCELLED),
344 Timeout: true,
345}
346
347func (c *context) Header() http.Header { return c.outHeader }
348
349// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
350// codes do not permit a response body (nor response entity headers such as
351// Content-Length, Content-Type, etc).
352func bodyAllowedForStatus(status int) bool {
353 switch {
354 case status >= 100 && status <= 199:
355 return false
356 case status == 204:
357 return false
358 case status == 304:
359 return false
360 }
361 return true
362}
363
364func (c *context) Write(b []byte) (int, error) {
365 if c.outCode == 0 {
366 c.WriteHeader(http.StatusOK)
367 }
368 if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
369 return 0, http.ErrBodyNotAllowed
370 }
371 c.outBody = append(c.outBody, b...)
372 return len(b), nil
373}
374
375func (c *context) WriteHeader(code int) {
376 if c.outCode != 0 {
377 logf(c, 3, "WriteHeader called multiple times on request.") // error level
378 return
379 }
380 c.outCode = code
381}
382
383func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
384 hreq := &http.Request{
385 Method: "POST",
386 URL: c.apiURL,
387 Header: http.Header{
388 apiEndpointHeader: apiEndpointHeaderValue,
389 apiMethodHeader: apiMethodHeaderValue,
390 apiContentType: apiContentTypeValue,
391 apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
392 },
393 Body: ioutil.NopCloser(bytes.NewReader(body)),
394 ContentLength: int64(len(body)),
395 Host: c.apiURL.Host,
396 }
397 if info := c.req.Header.Get(dapperHeader); info != "" {
398 hreq.Header.Set(dapperHeader, info)
399 }
400 if info := c.req.Header.Get(traceHeader); info != "" {
401 hreq.Header.Set(traceHeader, info)
402 }
403
404 tr := apiHTTPClient.Transport.(*http.Transport)
405
406 var timedOut int32 // atomic; set to 1 if timed out
407 t := time.AfterFunc(timeout, func() {
408 atomic.StoreInt32(&timedOut, 1)
409 tr.CancelRequest(hreq)
410 })
411 defer t.Stop()
412 defer func() {
413 // Check if timeout was exceeded.
414 if atomic.LoadInt32(&timedOut) != 0 {
415 err = errTimeout
416 }
417 }()
418
419 hresp, err := apiHTTPClient.Do(hreq)
420 if err != nil {
421 return nil, &CallError{
422 Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
423 Code: int32(remotepb.RpcError_UNKNOWN),
424 }
425 }
426 defer hresp.Body.Close()
427 hrespBody, err := ioutil.ReadAll(hresp.Body)
428 if hresp.StatusCode != 200 {
429 return nil, &CallError{
430 Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
431 Code: int32(remotepb.RpcError_UNKNOWN),
432 }
433 }
434 if err != nil {
435 return nil, &CallError{
436 Detail: fmt.Sprintf("service bridge response bad: %v", err),
437 Code: int32(remotepb.RpcError_UNKNOWN),
438 }
439 }
440 return hrespBody, nil
441}
442
443func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
444 if ns := NamespaceFromContext(ctx); ns != "" {
445 if fn, ok := NamespaceMods[service]; ok {
446 fn(in, ns)
447 }
448 }
449
450 if f, ctx, ok := callOverrideFromContext(ctx); ok {
451 return f(ctx, service, method, in, out)
452 }
453
454 // Handle already-done contexts quickly.
455 select {
456 case <-ctx.Done():
457 return ctx.Err()
458 default:
459 }
460
461 c := fromContext(ctx)
462 if c == nil {
463 // Give a good error message rather than a panic lower down.
464 return errNotAppEngineContext
465 }
466
467 // Apply transaction modifications if we're in a transaction.
468 if t := transactionFromContext(ctx); t != nil {
469 if t.finished {
470 return errors.New("transaction context has expired")
471 }
472 applyTransaction(in, &t.transaction)
473 }
474
475 // Default RPC timeout is 60s.
476 timeout := 60 * time.Second
477 if deadline, ok := ctx.Deadline(); ok {
478 timeout = deadline.Sub(time.Now())
479 }
480
481 data, err := proto.Marshal(in)
482 if err != nil {
483 return err
484 }
485
486 ticket := c.req.Header.Get(ticketHeader)
487 // Use a test ticket under test environment.
488 if ticket == "" {
489 if appid := ctx.Value(&appIDOverrideKey); appid != nil {
490 ticket = appid.(string) + defaultTicketSuffix
491 }
492 }
493 // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
494 if ticket == "" {
495 ticket = DefaultTicket()
496 }
497 req := &remotepb.Request{
498 ServiceName: &service,
499 Method: &method,
500 Request: data,
501 RequestId: &ticket,
502 }
503 hreqBody, err := proto.Marshal(req)
504 if err != nil {
505 return err
506 }
507
508 hrespBody, err := c.post(hreqBody, timeout)
509 if err != nil {
510 return err
511 }
512
513 res := &remotepb.Response{}
514 if err := proto.Unmarshal(hrespBody, res); err != nil {
515 return err
516 }
517 if res.RpcError != nil {
518 ce := &CallError{
519 Detail: res.RpcError.GetDetail(),
520 Code: *res.RpcError.Code,
521 }
522 switch remotepb.RpcError_ErrorCode(ce.Code) {
523 case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
524 ce.Timeout = true
525 }
526 return ce
527 }
528 if res.ApplicationError != nil {
529 return &APIError{
530 Service: *req.ServiceName,
531 Detail: res.ApplicationError.GetDetail(),
532 Code: *res.ApplicationError.Code,
533 }
534 }
535 if res.Exception != nil || res.JavaException != nil {
536 // This shouldn't happen, but let's be defensive.
537 return &CallError{
538 Detail: "service bridge returned exception",
539 Code: int32(remotepb.RpcError_UNKNOWN),
540 }
541 }
542 return proto.Unmarshal(res.Response, out)
543}
544
545func (c *context) Request() *http.Request {
546 return c.req
547}
548
549func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
550 // Truncate long log lines.
551 // TODO(dsymonds): Check if this is still necessary.
552 const lim = 8 << 10
553 if len(*ll.Message) > lim {
554 suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
555 ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
556 }
557
558 c.pendingLogs.Lock()
559 c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
560 c.pendingLogs.Unlock()
561}
562
563var logLevelName = map[int64]string{
564 0: "DEBUG",
565 1: "INFO",
566 2: "WARNING",
567 3: "ERROR",
568 4: "CRITICAL",
569}
570
571func logf(c *context, level int64, format string, args ...interface{}) {
572 if c == nil {
573 panic("not an App Engine context")
574 }
575 s := fmt.Sprintf(format, args...)
576 s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
577 c.addLogLine(&logpb.UserAppLogLine{
578 TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
579 Level: &level,
580 Message: &s,
581 })
582 // Only duplicate log to stderr if not running on App Engine second generation
583 if !IsSecondGen() {
584 log.Print(logLevelName[level] + ": " + s)
585 }
586}
587
588// flushLog attempts to flush any pending logs to the appserver.
589// It should not be called concurrently.
590func (c *context) flushLog(force bool) (flushed bool) {
591 c.pendingLogs.Lock()
592 // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
593 n, rem := 0, 30<<20
594 for ; n < len(c.pendingLogs.lines); n++ {
595 ll := c.pendingLogs.lines[n]
596 // Each log line will require about 3 bytes of overhead.
597 nb := proto.Size(ll) + 3
598 if nb > rem {
599 break
600 }
601 rem -= nb
602 }
603 lines := c.pendingLogs.lines[:n]
604 c.pendingLogs.lines = c.pendingLogs.lines[n:]
605 c.pendingLogs.Unlock()
606
607 if len(lines) == 0 && !force {
608 // Nothing to flush.
609 return false
610 }
611
612 rescueLogs := false
613 defer func() {
614 if rescueLogs {
615 c.pendingLogs.Lock()
616 c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
617 c.pendingLogs.Unlock()
618 }
619 }()
620
621 buf, err := proto.Marshal(&logpb.UserAppLogGroup{
622 LogLine: lines,
623 })
624 if err != nil {
625 log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
626 rescueLogs = true
627 return false
628 }
629
630 req := &logpb.FlushRequest{
631 Logs: buf,
632 }
633 res := &basepb.VoidProto{}
634 c.pendingLogs.Lock()
635 c.pendingLogs.flushes++
636 c.pendingLogs.Unlock()
637 if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
638 log.Printf("internal.flushLog: Flush RPC: %v", err)
639 rescueLogs = true
640 return false
641 }
642 return true
643}
644
645const (
646 // Log flushing parameters.
647 flushInterval = 1 * time.Second
648 forceFlushInterval = 60 * time.Second
649)
650
651func (c *context) logFlusher(stop <-chan int) {
652 lastFlush := time.Now()
653 tick := time.NewTicker(flushInterval)
654 for {
655 select {
656 case <-stop:
657 // Request finished.
658 tick.Stop()
659 return
660 case <-tick.C:
661 force := time.Now().Sub(lastFlush) > forceFlushInterval
662 if c.flushLog(force) {
663 lastFlush = time.Now()
664 }
665 }
666 }
667}
668
669func ContextForTesting(req *http.Request) netcontext.Context {
670 return toContext(&context{req: req})
671}