blob: a6ec19e14cfc4c7d7f69d8b28d0cab8665232bb3 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001// Copyright 2011 Google Inc. All rights reserved.
2// Use of this source code is governed by the Apache 2.0
3// license that can be found in the LICENSE file.
4
5// +build !appengine
6
7package internal
8
9import (
10 "bytes"
11 "errors"
12 "fmt"
13 "io/ioutil"
14 "log"
15 "net"
16 "net/http"
17 "net/url"
18 "os"
19 "runtime"
20 "strconv"
21 "strings"
22 "sync"
23 "sync/atomic"
24 "time"
25
26 "github.com/golang/protobuf/proto"
27 netcontext "golang.org/x/net/context"
28
29 basepb "google.golang.org/appengine/internal/base"
30 logpb "google.golang.org/appengine/internal/log"
31 remotepb "google.golang.org/appengine/internal/remote_api"
32)
33
34const (
35 apiPath = "/rpc_http"
36 defaultTicketSuffix = "/default.20150612t184001.0"
37)
38
39var (
40 // Incoming headers.
41 ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
42 dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
43 traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
44 curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
45 userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
46 remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
47 devRequestIdHeader = http.CanonicalHeaderKey("X-Appengine-Dev-Request-Id")
48
49 // Outgoing headers.
50 apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
51 apiEndpointHeaderValue = []string{"app-engine-apis"}
52 apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
53 apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
54 apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
55 apiContentType = http.CanonicalHeaderKey("Content-Type")
56 apiContentTypeValue = []string{"application/octet-stream"}
57 logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
58
59 apiHTTPClient = &http.Client{
60 Transport: &http.Transport{
61 Proxy: http.ProxyFromEnvironment,
62 Dial: limitDial,
63 },
64 }
65
66 defaultTicketOnce sync.Once
67 defaultTicket string
68 backgroundContextOnce sync.Once
69 backgroundContext netcontext.Context
70)
71
72func apiURL() *url.URL {
73 host, port := "appengine.googleapis.internal", "10001"
74 if h := os.Getenv("API_HOST"); h != "" {
75 host = h
76 }
77 if p := os.Getenv("API_PORT"); p != "" {
78 port = p
79 }
80 return &url.URL{
81 Scheme: "http",
82 Host: host + ":" + port,
83 Path: apiPath,
84 }
85}
86
87func handleHTTP(w http.ResponseWriter, r *http.Request) {
88 c := &context{
89 req: r,
90 outHeader: w.Header(),
91 apiURL: apiURL(),
92 }
93 r = r.WithContext(withContext(r.Context(), c))
94 c.req = r
95
96 stopFlushing := make(chan int)
97
98 // Patch up RemoteAddr so it looks reasonable.
99 if addr := r.Header.Get(userIPHeader); addr != "" {
100 r.RemoteAddr = addr
101 } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
102 r.RemoteAddr = addr
103 } else {
104 // Should not normally reach here, but pick a sensible default anyway.
105 r.RemoteAddr = "127.0.0.1"
106 }
107 // The address in the headers will most likely be of these forms:
108 // 123.123.123.123
109 // 2001:db8::1
110 // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
111 if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
112 // Assume the remote address is only a host; add a default port.
113 r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
114 }
115
116 // Start goroutine responsible for flushing app logs.
117 // This is done after adding c to ctx.m (and stopped before removing it)
118 // because flushing logs requires making an API call.
119 go c.logFlusher(stopFlushing)
120
121 executeRequestSafely(c, r)
122 c.outHeader = nil // make sure header changes aren't respected any more
123
124 stopFlushing <- 1 // any logging beyond this point will be dropped
125
126 // Flush any pending logs asynchronously.
127 c.pendingLogs.Lock()
128 flushes := c.pendingLogs.flushes
129 if len(c.pendingLogs.lines) > 0 {
130 flushes++
131 }
132 c.pendingLogs.Unlock()
133 flushed := make(chan struct{})
134 go func() {
135 defer close(flushed)
136 // Force a log flush, because with very short requests we
137 // may not ever flush logs.
138 c.flushLog(true)
139 }()
140 w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
141
142 // Avoid nil Write call if c.Write is never called.
143 if c.outCode != 0 {
144 w.WriteHeader(c.outCode)
145 }
146 if c.outBody != nil {
147 w.Write(c.outBody)
148 }
149 // Wait for the last flush to complete before returning,
150 // otherwise the security ticket will not be valid.
151 <-flushed
152}
153
154func executeRequestSafely(c *context, r *http.Request) {
155 defer func() {
156 if x := recover(); x != nil {
157 logf(c, 4, "%s", renderPanic(x)) // 4 == critical
158 c.outCode = 500
159 }
160 }()
161
162 http.DefaultServeMux.ServeHTTP(c, r)
163}
164
165func renderPanic(x interface{}) string {
166 buf := make([]byte, 16<<10) // 16 KB should be plenty
167 buf = buf[:runtime.Stack(buf, false)]
168
169 // Remove the first few stack frames:
170 // this func
171 // the recover closure in the caller
172 // That will root the stack trace at the site of the panic.
173 const (
174 skipStart = "internal.renderPanic"
175 skipFrames = 2
176 )
177 start := bytes.Index(buf, []byte(skipStart))
178 p := start
179 for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
180 p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
181 if p < 0 {
182 break
183 }
184 }
185 if p >= 0 {
186 // buf[start:p+1] is the block to remove.
187 // Copy buf[p+1:] over buf[start:] and shrink buf.
188 copy(buf[start:], buf[p+1:])
189 buf = buf[:len(buf)-(p+1-start)]
190 }
191
192 // Add panic heading.
193 head := fmt.Sprintf("panic: %v\n\n", x)
194 if len(head) > len(buf) {
195 // Extremely unlikely to happen.
196 return head
197 }
198 copy(buf[len(head):], buf)
199 copy(buf, head)
200
201 return string(buf)
202}
203
204// context represents the context of an in-flight HTTP request.
205// It implements the appengine.Context and http.ResponseWriter interfaces.
206type context struct {
207 req *http.Request
208
209 outCode int
210 outHeader http.Header
211 outBody []byte
212
213 pendingLogs struct {
214 sync.Mutex
215 lines []*logpb.UserAppLogLine
216 flushes int
217 }
218
219 apiURL *url.URL
220}
221
222var contextKey = "holds a *context"
223
224// jointContext joins two contexts in a superficial way.
225// It takes values and timeouts from a base context, and only values from another context.
226type jointContext struct {
227 base netcontext.Context
228 valuesOnly netcontext.Context
229}
230
231func (c jointContext) Deadline() (time.Time, bool) {
232 return c.base.Deadline()
233}
234
235func (c jointContext) Done() <-chan struct{} {
236 return c.base.Done()
237}
238
239func (c jointContext) Err() error {
240 return c.base.Err()
241}
242
243func (c jointContext) Value(key interface{}) interface{} {
244 if val := c.base.Value(key); val != nil {
245 return val
246 }
247 return c.valuesOnly.Value(key)
248}
249
250// fromContext returns the App Engine context or nil if ctx is not
251// derived from an App Engine context.
252func fromContext(ctx netcontext.Context) *context {
253 c, _ := ctx.Value(&contextKey).(*context)
254 return c
255}
256
257func withContext(parent netcontext.Context, c *context) netcontext.Context {
258 ctx := netcontext.WithValue(parent, &contextKey, c)
259 if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
260 ctx = withNamespace(ctx, ns)
261 }
262 return ctx
263}
264
265func toContext(c *context) netcontext.Context {
266 return withContext(netcontext.Background(), c)
267}
268
269func IncomingHeaders(ctx netcontext.Context) http.Header {
270 if c := fromContext(ctx); c != nil {
271 return c.req.Header
272 }
273 return nil
274}
275
276func ReqContext(req *http.Request) netcontext.Context {
277 return req.Context()
278}
279
280func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
281 return jointContext{
282 base: parent,
283 valuesOnly: req.Context(),
284 }
285}
286
287// DefaultTicket returns a ticket used for background context or dev_appserver.
288func DefaultTicket() string {
289 defaultTicketOnce.Do(func() {
290 if IsDevAppServer() {
291 defaultTicket = "testapp" + defaultTicketSuffix
292 return
293 }
294 appID := partitionlessAppID()
295 escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
296 majVersion := VersionID(nil)
297 if i := strings.Index(majVersion, "."); i > 0 {
298 majVersion = majVersion[:i]
299 }
300 defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
301 })
302 return defaultTicket
303}
304
305func BackgroundContext() netcontext.Context {
306 backgroundContextOnce.Do(func() {
307 // Compute background security ticket.
308 ticket := DefaultTicket()
309
310 c := &context{
311 req: &http.Request{
312 Header: http.Header{
313 ticketHeader: []string{ticket},
314 },
315 },
316 apiURL: apiURL(),
317 }
318 backgroundContext = toContext(c)
319
320 // TODO(dsymonds): Wire up the shutdown handler to do a final flush.
321 go c.logFlusher(make(chan int))
322 })
323
324 return backgroundContext
325}
326
327// RegisterTestRequest registers the HTTP request req for testing, such that
328// any API calls are sent to the provided URL. It returns a closure to delete
329// the registration.
330// It should only be used by aetest package.
331func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
332 c := &context{
333 req: req,
334 apiURL: apiURL,
335 }
336 ctx := withContext(decorate(req.Context()), c)
337 req = req.WithContext(ctx)
338 c.req = req
339 return req, func() {}
340}
341
342var errTimeout = &CallError{
343 Detail: "Deadline exceeded",
344 Code: int32(remotepb.RpcError_CANCELLED),
345 Timeout: true,
346}
347
348func (c *context) Header() http.Header { return c.outHeader }
349
350// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
351// codes do not permit a response body (nor response entity headers such as
352// Content-Length, Content-Type, etc).
353func bodyAllowedForStatus(status int) bool {
354 switch {
355 case status >= 100 && status <= 199:
356 return false
357 case status == 204:
358 return false
359 case status == 304:
360 return false
361 }
362 return true
363}
364
365func (c *context) Write(b []byte) (int, error) {
366 if c.outCode == 0 {
367 c.WriteHeader(http.StatusOK)
368 }
369 if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
370 return 0, http.ErrBodyNotAllowed
371 }
372 c.outBody = append(c.outBody, b...)
373 return len(b), nil
374}
375
376func (c *context) WriteHeader(code int) {
377 if c.outCode != 0 {
378 logf(c, 3, "WriteHeader called multiple times on request.") // error level
379 return
380 }
381 c.outCode = code
382}
383
384func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
385 hreq := &http.Request{
386 Method: "POST",
387 URL: c.apiURL,
388 Header: http.Header{
389 apiEndpointHeader: apiEndpointHeaderValue,
390 apiMethodHeader: apiMethodHeaderValue,
391 apiContentType: apiContentTypeValue,
392 apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
393 },
394 Body: ioutil.NopCloser(bytes.NewReader(body)),
395 ContentLength: int64(len(body)),
396 Host: c.apiURL.Host,
397 }
398 if info := c.req.Header.Get(dapperHeader); info != "" {
399 hreq.Header.Set(dapperHeader, info)
400 }
401 if info := c.req.Header.Get(traceHeader); info != "" {
402 hreq.Header.Set(traceHeader, info)
403 }
404
405 tr := apiHTTPClient.Transport.(*http.Transport)
406
407 var timedOut int32 // atomic; set to 1 if timed out
408 t := time.AfterFunc(timeout, func() {
409 atomic.StoreInt32(&timedOut, 1)
410 tr.CancelRequest(hreq)
411 })
412 defer t.Stop()
413 defer func() {
414 // Check if timeout was exceeded.
415 if atomic.LoadInt32(&timedOut) != 0 {
416 err = errTimeout
417 }
418 }()
419
420 hresp, err := apiHTTPClient.Do(hreq)
421 if err != nil {
422 return nil, &CallError{
423 Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
424 Code: int32(remotepb.RpcError_UNKNOWN),
425 }
426 }
427 defer hresp.Body.Close()
428 hrespBody, err := ioutil.ReadAll(hresp.Body)
429 if hresp.StatusCode != 200 {
430 return nil, &CallError{
431 Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
432 Code: int32(remotepb.RpcError_UNKNOWN),
433 }
434 }
435 if err != nil {
436 return nil, &CallError{
437 Detail: fmt.Sprintf("service bridge response bad: %v", err),
438 Code: int32(remotepb.RpcError_UNKNOWN),
439 }
440 }
441 return hrespBody, nil
442}
443
444func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
445 if ns := NamespaceFromContext(ctx); ns != "" {
446 if fn, ok := NamespaceMods[service]; ok {
447 fn(in, ns)
448 }
449 }
450
451 if f, ctx, ok := callOverrideFromContext(ctx); ok {
452 return f(ctx, service, method, in, out)
453 }
454
455 // Handle already-done contexts quickly.
456 select {
457 case <-ctx.Done():
458 return ctx.Err()
459 default:
460 }
461
462 c := fromContext(ctx)
463 if c == nil {
464 // Give a good error message rather than a panic lower down.
465 return errNotAppEngineContext
466 }
467
468 // Apply transaction modifications if we're in a transaction.
469 if t := transactionFromContext(ctx); t != nil {
470 if t.finished {
471 return errors.New("transaction context has expired")
472 }
473 applyTransaction(in, &t.transaction)
474 }
475
476 // Default RPC timeout is 60s.
477 timeout := 60 * time.Second
478 if deadline, ok := ctx.Deadline(); ok {
479 timeout = deadline.Sub(time.Now())
480 }
481
482 data, err := proto.Marshal(in)
483 if err != nil {
484 return err
485 }
486
487 ticket := c.req.Header.Get(ticketHeader)
488 // Use a test ticket under test environment.
489 if ticket == "" {
490 if appid := ctx.Value(&appIDOverrideKey); appid != nil {
491 ticket = appid.(string) + defaultTicketSuffix
492 }
493 }
494 // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
495 if ticket == "" {
496 ticket = DefaultTicket()
497 }
498 if dri := c.req.Header.Get(devRequestIdHeader); IsDevAppServer() && dri != "" {
499 ticket = dri
500 }
501 req := &remotepb.Request{
502 ServiceName: &service,
503 Method: &method,
504 Request: data,
505 RequestId: &ticket,
506 }
507 hreqBody, err := proto.Marshal(req)
508 if err != nil {
509 return err
510 }
511
512 hrespBody, err := c.post(hreqBody, timeout)
513 if err != nil {
514 return err
515 }
516
517 res := &remotepb.Response{}
518 if err := proto.Unmarshal(hrespBody, res); err != nil {
519 return err
520 }
521 if res.RpcError != nil {
522 ce := &CallError{
523 Detail: res.RpcError.GetDetail(),
524 Code: *res.RpcError.Code,
525 }
526 switch remotepb.RpcError_ErrorCode(ce.Code) {
527 case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
528 ce.Timeout = true
529 }
530 return ce
531 }
532 if res.ApplicationError != nil {
533 return &APIError{
534 Service: *req.ServiceName,
535 Detail: res.ApplicationError.GetDetail(),
536 Code: *res.ApplicationError.Code,
537 }
538 }
539 if res.Exception != nil || res.JavaException != nil {
540 // This shouldn't happen, but let's be defensive.
541 return &CallError{
542 Detail: "service bridge returned exception",
543 Code: int32(remotepb.RpcError_UNKNOWN),
544 }
545 }
546 return proto.Unmarshal(res.Response, out)
547}
548
549func (c *context) Request() *http.Request {
550 return c.req
551}
552
553func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
554 // Truncate long log lines.
555 // TODO(dsymonds): Check if this is still necessary.
556 const lim = 8 << 10
557 if len(*ll.Message) > lim {
558 suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
559 ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
560 }
561
562 c.pendingLogs.Lock()
563 c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
564 c.pendingLogs.Unlock()
565}
566
567var logLevelName = map[int64]string{
568 0: "DEBUG",
569 1: "INFO",
570 2: "WARNING",
571 3: "ERROR",
572 4: "CRITICAL",
573}
574
575func logf(c *context, level int64, format string, args ...interface{}) {
576 if c == nil {
577 panic("not an App Engine context")
578 }
579 s := fmt.Sprintf(format, args...)
580 s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
581 c.addLogLine(&logpb.UserAppLogLine{
582 TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
583 Level: &level,
584 Message: &s,
585 })
586 // Only duplicate log to stderr if not running on App Engine second generation
587 if !IsSecondGen() {
588 log.Print(logLevelName[level] + ": " + s)
589 }
590}
591
592// flushLog attempts to flush any pending logs to the appserver.
593// It should not be called concurrently.
594func (c *context) flushLog(force bool) (flushed bool) {
595 c.pendingLogs.Lock()
596 // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
597 n, rem := 0, 30<<20
598 for ; n < len(c.pendingLogs.lines); n++ {
599 ll := c.pendingLogs.lines[n]
600 // Each log line will require about 3 bytes of overhead.
601 nb := proto.Size(ll) + 3
602 if nb > rem {
603 break
604 }
605 rem -= nb
606 }
607 lines := c.pendingLogs.lines[:n]
608 c.pendingLogs.lines = c.pendingLogs.lines[n:]
609 c.pendingLogs.Unlock()
610
611 if len(lines) == 0 && !force {
612 // Nothing to flush.
613 return false
614 }
615
616 rescueLogs := false
617 defer func() {
618 if rescueLogs {
619 c.pendingLogs.Lock()
620 c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
621 c.pendingLogs.Unlock()
622 }
623 }()
624
625 buf, err := proto.Marshal(&logpb.UserAppLogGroup{
626 LogLine: lines,
627 })
628 if err != nil {
629 log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
630 rescueLogs = true
631 return false
632 }
633
634 req := &logpb.FlushRequest{
635 Logs: buf,
636 }
637 res := &basepb.VoidProto{}
638 c.pendingLogs.Lock()
639 c.pendingLogs.flushes++
640 c.pendingLogs.Unlock()
641 if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
642 log.Printf("internal.flushLog: Flush RPC: %v", err)
643 rescueLogs = true
644 return false
645 }
646 return true
647}
648
649const (
650 // Log flushing parameters.
651 flushInterval = 1 * time.Second
652 forceFlushInterval = 60 * time.Second
653)
654
655func (c *context) logFlusher(stop <-chan int) {
656 lastFlush := time.Now()
657 tick := time.NewTicker(flushInterval)
658 for {
659 select {
660 case <-stop:
661 // Request finished.
662 tick.Stop()
663 return
664 case <-tick.C:
665 force := time.Now().Sub(lastFlush) > forceFlushInterval
666 if c.flushLog(force) {
667 lastFlush = time.Now()
668 }
669 }
670 }
671}
672
673func ContextForTesting(req *http.Request) netcontext.Context {
674 return toContext(&context{req: req})
675}