Zack Williams | e940c7a | 2019-08-21 14:25:39 -0700 | [diff] [blame] | 1 | // Copyright 2011 Google Inc. All rights reserved. |
| 2 | // Use of this source code is governed by the Apache 2.0 |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | // +build !appengine |
| 6 | |
| 7 | package internal |
| 8 | |
| 9 | import ( |
| 10 | "bytes" |
| 11 | "errors" |
| 12 | "fmt" |
| 13 | "io/ioutil" |
| 14 | "log" |
| 15 | "net" |
| 16 | "net/http" |
| 17 | "net/url" |
| 18 | "os" |
| 19 | "runtime" |
| 20 | "strconv" |
| 21 | "strings" |
| 22 | "sync" |
| 23 | "sync/atomic" |
| 24 | "time" |
| 25 | |
| 26 | "github.com/golang/protobuf/proto" |
| 27 | netcontext "golang.org/x/net/context" |
| 28 | |
| 29 | basepb "google.golang.org/appengine/internal/base" |
| 30 | logpb "google.golang.org/appengine/internal/log" |
| 31 | remotepb "google.golang.org/appengine/internal/remote_api" |
| 32 | ) |
| 33 | |
| 34 | const ( |
| 35 | apiPath = "/rpc_http" |
| 36 | defaultTicketSuffix = "/default.20150612t184001.0" |
| 37 | ) |
| 38 | |
| 39 | var ( |
| 40 | // Incoming headers. |
| 41 | ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket") |
| 42 | dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo") |
| 43 | traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context") |
| 44 | curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") |
| 45 | userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP") |
| 46 | remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr") |
| 47 | devRequestIdHeader = http.CanonicalHeaderKey("X-Appengine-Dev-Request-Id") |
| 48 | |
| 49 | // Outgoing headers. |
| 50 | apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint") |
| 51 | apiEndpointHeaderValue = []string{"app-engine-apis"} |
| 52 | apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method") |
| 53 | apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"} |
| 54 | apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline") |
| 55 | apiContentType = http.CanonicalHeaderKey("Content-Type") |
| 56 | apiContentTypeValue = []string{"application/octet-stream"} |
| 57 | logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count") |
| 58 | |
| 59 | apiHTTPClient = &http.Client{ |
| 60 | Transport: &http.Transport{ |
| 61 | Proxy: http.ProxyFromEnvironment, |
| 62 | Dial: limitDial, |
| 63 | }, |
| 64 | } |
| 65 | |
| 66 | defaultTicketOnce sync.Once |
| 67 | defaultTicket string |
| 68 | backgroundContextOnce sync.Once |
| 69 | backgroundContext netcontext.Context |
| 70 | ) |
| 71 | |
| 72 | func apiURL() *url.URL { |
| 73 | host, port := "appengine.googleapis.internal", "10001" |
| 74 | if h := os.Getenv("API_HOST"); h != "" { |
| 75 | host = h |
| 76 | } |
| 77 | if p := os.Getenv("API_PORT"); p != "" { |
| 78 | port = p |
| 79 | } |
| 80 | return &url.URL{ |
| 81 | Scheme: "http", |
| 82 | Host: host + ":" + port, |
| 83 | Path: apiPath, |
| 84 | } |
| 85 | } |
| 86 | |
| 87 | func handleHTTP(w http.ResponseWriter, r *http.Request) { |
| 88 | c := &context{ |
| 89 | req: r, |
| 90 | outHeader: w.Header(), |
| 91 | apiURL: apiURL(), |
| 92 | } |
| 93 | r = r.WithContext(withContext(r.Context(), c)) |
| 94 | c.req = r |
| 95 | |
| 96 | stopFlushing := make(chan int) |
| 97 | |
| 98 | // Patch up RemoteAddr so it looks reasonable. |
| 99 | if addr := r.Header.Get(userIPHeader); addr != "" { |
| 100 | r.RemoteAddr = addr |
| 101 | } else if addr = r.Header.Get(remoteAddrHeader); addr != "" { |
| 102 | r.RemoteAddr = addr |
| 103 | } else { |
| 104 | // Should not normally reach here, but pick a sensible default anyway. |
| 105 | r.RemoteAddr = "127.0.0.1" |
| 106 | } |
| 107 | // The address in the headers will most likely be of these forms: |
| 108 | // 123.123.123.123 |
| 109 | // 2001:db8::1 |
| 110 | // net/http.Request.RemoteAddr is specified to be in "IP:port" form. |
| 111 | if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil { |
| 112 | // Assume the remote address is only a host; add a default port. |
| 113 | r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") |
| 114 | } |
| 115 | |
| 116 | // Start goroutine responsible for flushing app logs. |
| 117 | // This is done after adding c to ctx.m (and stopped before removing it) |
| 118 | // because flushing logs requires making an API call. |
| 119 | go c.logFlusher(stopFlushing) |
| 120 | |
| 121 | executeRequestSafely(c, r) |
| 122 | c.outHeader = nil // make sure header changes aren't respected any more |
| 123 | |
| 124 | stopFlushing <- 1 // any logging beyond this point will be dropped |
| 125 | |
| 126 | // Flush any pending logs asynchronously. |
| 127 | c.pendingLogs.Lock() |
| 128 | flushes := c.pendingLogs.flushes |
| 129 | if len(c.pendingLogs.lines) > 0 { |
| 130 | flushes++ |
| 131 | } |
| 132 | c.pendingLogs.Unlock() |
| 133 | flushed := make(chan struct{}) |
| 134 | go func() { |
| 135 | defer close(flushed) |
| 136 | // Force a log flush, because with very short requests we |
| 137 | // may not ever flush logs. |
| 138 | c.flushLog(true) |
| 139 | }() |
| 140 | w.Header().Set(logFlushHeader, strconv.Itoa(flushes)) |
| 141 | |
| 142 | // Avoid nil Write call if c.Write is never called. |
| 143 | if c.outCode != 0 { |
| 144 | w.WriteHeader(c.outCode) |
| 145 | } |
| 146 | if c.outBody != nil { |
| 147 | w.Write(c.outBody) |
| 148 | } |
| 149 | // Wait for the last flush to complete before returning, |
| 150 | // otherwise the security ticket will not be valid. |
| 151 | <-flushed |
| 152 | } |
| 153 | |
| 154 | func executeRequestSafely(c *context, r *http.Request) { |
| 155 | defer func() { |
| 156 | if x := recover(); x != nil { |
| 157 | logf(c, 4, "%s", renderPanic(x)) // 4 == critical |
| 158 | c.outCode = 500 |
| 159 | } |
| 160 | }() |
| 161 | |
| 162 | http.DefaultServeMux.ServeHTTP(c, r) |
| 163 | } |
| 164 | |
| 165 | func renderPanic(x interface{}) string { |
| 166 | buf := make([]byte, 16<<10) // 16 KB should be plenty |
| 167 | buf = buf[:runtime.Stack(buf, false)] |
| 168 | |
| 169 | // Remove the first few stack frames: |
| 170 | // this func |
| 171 | // the recover closure in the caller |
| 172 | // That will root the stack trace at the site of the panic. |
| 173 | const ( |
| 174 | skipStart = "internal.renderPanic" |
| 175 | skipFrames = 2 |
| 176 | ) |
| 177 | start := bytes.Index(buf, []byte(skipStart)) |
| 178 | p := start |
| 179 | for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ { |
| 180 | p = bytes.IndexByte(buf[p+1:], '\n') + p + 1 |
| 181 | if p < 0 { |
| 182 | break |
| 183 | } |
| 184 | } |
| 185 | if p >= 0 { |
| 186 | // buf[start:p+1] is the block to remove. |
| 187 | // Copy buf[p+1:] over buf[start:] and shrink buf. |
| 188 | copy(buf[start:], buf[p+1:]) |
| 189 | buf = buf[:len(buf)-(p+1-start)] |
| 190 | } |
| 191 | |
| 192 | // Add panic heading. |
| 193 | head := fmt.Sprintf("panic: %v\n\n", x) |
| 194 | if len(head) > len(buf) { |
| 195 | // Extremely unlikely to happen. |
| 196 | return head |
| 197 | } |
| 198 | copy(buf[len(head):], buf) |
| 199 | copy(buf, head) |
| 200 | |
| 201 | return string(buf) |
| 202 | } |
| 203 | |
| 204 | // context represents the context of an in-flight HTTP request. |
| 205 | // It implements the appengine.Context and http.ResponseWriter interfaces. |
| 206 | type context struct { |
| 207 | req *http.Request |
| 208 | |
| 209 | outCode int |
| 210 | outHeader http.Header |
| 211 | outBody []byte |
| 212 | |
| 213 | pendingLogs struct { |
| 214 | sync.Mutex |
| 215 | lines []*logpb.UserAppLogLine |
| 216 | flushes int |
| 217 | } |
| 218 | |
| 219 | apiURL *url.URL |
| 220 | } |
| 221 | |
| 222 | var contextKey = "holds a *context" |
| 223 | |
| 224 | // jointContext joins two contexts in a superficial way. |
| 225 | // It takes values and timeouts from a base context, and only values from another context. |
| 226 | type jointContext struct { |
| 227 | base netcontext.Context |
| 228 | valuesOnly netcontext.Context |
| 229 | } |
| 230 | |
| 231 | func (c jointContext) Deadline() (time.Time, bool) { |
| 232 | return c.base.Deadline() |
| 233 | } |
| 234 | |
| 235 | func (c jointContext) Done() <-chan struct{} { |
| 236 | return c.base.Done() |
| 237 | } |
| 238 | |
| 239 | func (c jointContext) Err() error { |
| 240 | return c.base.Err() |
| 241 | } |
| 242 | |
| 243 | func (c jointContext) Value(key interface{}) interface{} { |
| 244 | if val := c.base.Value(key); val != nil { |
| 245 | return val |
| 246 | } |
| 247 | return c.valuesOnly.Value(key) |
| 248 | } |
| 249 | |
| 250 | // fromContext returns the App Engine context or nil if ctx is not |
| 251 | // derived from an App Engine context. |
| 252 | func fromContext(ctx netcontext.Context) *context { |
| 253 | c, _ := ctx.Value(&contextKey).(*context) |
| 254 | return c |
| 255 | } |
| 256 | |
| 257 | func withContext(parent netcontext.Context, c *context) netcontext.Context { |
| 258 | ctx := netcontext.WithValue(parent, &contextKey, c) |
| 259 | if ns := c.req.Header.Get(curNamespaceHeader); ns != "" { |
| 260 | ctx = withNamespace(ctx, ns) |
| 261 | } |
| 262 | return ctx |
| 263 | } |
| 264 | |
| 265 | func toContext(c *context) netcontext.Context { |
| 266 | return withContext(netcontext.Background(), c) |
| 267 | } |
| 268 | |
| 269 | func IncomingHeaders(ctx netcontext.Context) http.Header { |
| 270 | if c := fromContext(ctx); c != nil { |
| 271 | return c.req.Header |
| 272 | } |
| 273 | return nil |
| 274 | } |
| 275 | |
| 276 | func ReqContext(req *http.Request) netcontext.Context { |
| 277 | return req.Context() |
| 278 | } |
| 279 | |
| 280 | func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { |
| 281 | return jointContext{ |
| 282 | base: parent, |
| 283 | valuesOnly: req.Context(), |
| 284 | } |
| 285 | } |
| 286 | |
| 287 | // DefaultTicket returns a ticket used for background context or dev_appserver. |
| 288 | func DefaultTicket() string { |
| 289 | defaultTicketOnce.Do(func() { |
| 290 | if IsDevAppServer() { |
| 291 | defaultTicket = "testapp" + defaultTicketSuffix |
| 292 | return |
| 293 | } |
| 294 | appID := partitionlessAppID() |
| 295 | escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1) |
| 296 | majVersion := VersionID(nil) |
| 297 | if i := strings.Index(majVersion, "."); i > 0 { |
| 298 | majVersion = majVersion[:i] |
| 299 | } |
| 300 | defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID()) |
| 301 | }) |
| 302 | return defaultTicket |
| 303 | } |
| 304 | |
| 305 | func BackgroundContext() netcontext.Context { |
| 306 | backgroundContextOnce.Do(func() { |
| 307 | // Compute background security ticket. |
| 308 | ticket := DefaultTicket() |
| 309 | |
| 310 | c := &context{ |
| 311 | req: &http.Request{ |
| 312 | Header: http.Header{ |
| 313 | ticketHeader: []string{ticket}, |
| 314 | }, |
| 315 | }, |
| 316 | apiURL: apiURL(), |
| 317 | } |
| 318 | backgroundContext = toContext(c) |
| 319 | |
| 320 | // TODO(dsymonds): Wire up the shutdown handler to do a final flush. |
| 321 | go c.logFlusher(make(chan int)) |
| 322 | }) |
| 323 | |
| 324 | return backgroundContext |
| 325 | } |
| 326 | |
| 327 | // RegisterTestRequest registers the HTTP request req for testing, such that |
| 328 | // any API calls are sent to the provided URL. It returns a closure to delete |
| 329 | // the registration. |
| 330 | // It should only be used by aetest package. |
| 331 | func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) { |
| 332 | c := &context{ |
| 333 | req: req, |
| 334 | apiURL: apiURL, |
| 335 | } |
| 336 | ctx := withContext(decorate(req.Context()), c) |
| 337 | req = req.WithContext(ctx) |
| 338 | c.req = req |
| 339 | return req, func() {} |
| 340 | } |
| 341 | |
| 342 | var errTimeout = &CallError{ |
| 343 | Detail: "Deadline exceeded", |
| 344 | Code: int32(remotepb.RpcError_CANCELLED), |
| 345 | Timeout: true, |
| 346 | } |
| 347 | |
| 348 | func (c *context) Header() http.Header { return c.outHeader } |
| 349 | |
| 350 | // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status |
| 351 | // codes do not permit a response body (nor response entity headers such as |
| 352 | // Content-Length, Content-Type, etc). |
| 353 | func bodyAllowedForStatus(status int) bool { |
| 354 | switch { |
| 355 | case status >= 100 && status <= 199: |
| 356 | return false |
| 357 | case status == 204: |
| 358 | return false |
| 359 | case status == 304: |
| 360 | return false |
| 361 | } |
| 362 | return true |
| 363 | } |
| 364 | |
| 365 | func (c *context) Write(b []byte) (int, error) { |
| 366 | if c.outCode == 0 { |
| 367 | c.WriteHeader(http.StatusOK) |
| 368 | } |
| 369 | if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { |
| 370 | return 0, http.ErrBodyNotAllowed |
| 371 | } |
| 372 | c.outBody = append(c.outBody, b...) |
| 373 | return len(b), nil |
| 374 | } |
| 375 | |
| 376 | func (c *context) WriteHeader(code int) { |
| 377 | if c.outCode != 0 { |
| 378 | logf(c, 3, "WriteHeader called multiple times on request.") // error level |
| 379 | return |
| 380 | } |
| 381 | c.outCode = code |
| 382 | } |
| 383 | |
| 384 | func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) { |
| 385 | hreq := &http.Request{ |
| 386 | Method: "POST", |
| 387 | URL: c.apiURL, |
| 388 | Header: http.Header{ |
| 389 | apiEndpointHeader: apiEndpointHeaderValue, |
| 390 | apiMethodHeader: apiMethodHeaderValue, |
| 391 | apiContentType: apiContentTypeValue, |
| 392 | apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)}, |
| 393 | }, |
| 394 | Body: ioutil.NopCloser(bytes.NewReader(body)), |
| 395 | ContentLength: int64(len(body)), |
| 396 | Host: c.apiURL.Host, |
| 397 | } |
| 398 | if info := c.req.Header.Get(dapperHeader); info != "" { |
| 399 | hreq.Header.Set(dapperHeader, info) |
| 400 | } |
| 401 | if info := c.req.Header.Get(traceHeader); info != "" { |
| 402 | hreq.Header.Set(traceHeader, info) |
| 403 | } |
| 404 | |
| 405 | tr := apiHTTPClient.Transport.(*http.Transport) |
| 406 | |
| 407 | var timedOut int32 // atomic; set to 1 if timed out |
| 408 | t := time.AfterFunc(timeout, func() { |
| 409 | atomic.StoreInt32(&timedOut, 1) |
| 410 | tr.CancelRequest(hreq) |
| 411 | }) |
| 412 | defer t.Stop() |
| 413 | defer func() { |
| 414 | // Check if timeout was exceeded. |
| 415 | if atomic.LoadInt32(&timedOut) != 0 { |
| 416 | err = errTimeout |
| 417 | } |
| 418 | }() |
| 419 | |
| 420 | hresp, err := apiHTTPClient.Do(hreq) |
| 421 | if err != nil { |
| 422 | return nil, &CallError{ |
| 423 | Detail: fmt.Sprintf("service bridge HTTP failed: %v", err), |
| 424 | Code: int32(remotepb.RpcError_UNKNOWN), |
| 425 | } |
| 426 | } |
| 427 | defer hresp.Body.Close() |
| 428 | hrespBody, err := ioutil.ReadAll(hresp.Body) |
| 429 | if hresp.StatusCode != 200 { |
| 430 | return nil, &CallError{ |
| 431 | Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody), |
| 432 | Code: int32(remotepb.RpcError_UNKNOWN), |
| 433 | } |
| 434 | } |
| 435 | if err != nil { |
| 436 | return nil, &CallError{ |
| 437 | Detail: fmt.Sprintf("service bridge response bad: %v", err), |
| 438 | Code: int32(remotepb.RpcError_UNKNOWN), |
| 439 | } |
| 440 | } |
| 441 | return hrespBody, nil |
| 442 | } |
| 443 | |
| 444 | func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error { |
| 445 | if ns := NamespaceFromContext(ctx); ns != "" { |
| 446 | if fn, ok := NamespaceMods[service]; ok { |
| 447 | fn(in, ns) |
| 448 | } |
| 449 | } |
| 450 | |
| 451 | if f, ctx, ok := callOverrideFromContext(ctx); ok { |
| 452 | return f(ctx, service, method, in, out) |
| 453 | } |
| 454 | |
| 455 | // Handle already-done contexts quickly. |
| 456 | select { |
| 457 | case <-ctx.Done(): |
| 458 | return ctx.Err() |
| 459 | default: |
| 460 | } |
| 461 | |
| 462 | c := fromContext(ctx) |
| 463 | if c == nil { |
| 464 | // Give a good error message rather than a panic lower down. |
| 465 | return errNotAppEngineContext |
| 466 | } |
| 467 | |
| 468 | // Apply transaction modifications if we're in a transaction. |
| 469 | if t := transactionFromContext(ctx); t != nil { |
| 470 | if t.finished { |
| 471 | return errors.New("transaction context has expired") |
| 472 | } |
| 473 | applyTransaction(in, &t.transaction) |
| 474 | } |
| 475 | |
| 476 | // Default RPC timeout is 60s. |
| 477 | timeout := 60 * time.Second |
| 478 | if deadline, ok := ctx.Deadline(); ok { |
| 479 | timeout = deadline.Sub(time.Now()) |
| 480 | } |
| 481 | |
| 482 | data, err := proto.Marshal(in) |
| 483 | if err != nil { |
| 484 | return err |
| 485 | } |
| 486 | |
| 487 | ticket := c.req.Header.Get(ticketHeader) |
| 488 | // Use a test ticket under test environment. |
| 489 | if ticket == "" { |
| 490 | if appid := ctx.Value(&appIDOverrideKey); appid != nil { |
| 491 | ticket = appid.(string) + defaultTicketSuffix |
| 492 | } |
| 493 | } |
| 494 | // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver. |
| 495 | if ticket == "" { |
| 496 | ticket = DefaultTicket() |
| 497 | } |
| 498 | if dri := c.req.Header.Get(devRequestIdHeader); IsDevAppServer() && dri != "" { |
| 499 | ticket = dri |
| 500 | } |
| 501 | req := &remotepb.Request{ |
| 502 | ServiceName: &service, |
| 503 | Method: &method, |
| 504 | Request: data, |
| 505 | RequestId: &ticket, |
| 506 | } |
| 507 | hreqBody, err := proto.Marshal(req) |
| 508 | if err != nil { |
| 509 | return err |
| 510 | } |
| 511 | |
| 512 | hrespBody, err := c.post(hreqBody, timeout) |
| 513 | if err != nil { |
| 514 | return err |
| 515 | } |
| 516 | |
| 517 | res := &remotepb.Response{} |
| 518 | if err := proto.Unmarshal(hrespBody, res); err != nil { |
| 519 | return err |
| 520 | } |
| 521 | if res.RpcError != nil { |
| 522 | ce := &CallError{ |
| 523 | Detail: res.RpcError.GetDetail(), |
| 524 | Code: *res.RpcError.Code, |
| 525 | } |
| 526 | switch remotepb.RpcError_ErrorCode(ce.Code) { |
| 527 | case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED: |
| 528 | ce.Timeout = true |
| 529 | } |
| 530 | return ce |
| 531 | } |
| 532 | if res.ApplicationError != nil { |
| 533 | return &APIError{ |
| 534 | Service: *req.ServiceName, |
| 535 | Detail: res.ApplicationError.GetDetail(), |
| 536 | Code: *res.ApplicationError.Code, |
| 537 | } |
| 538 | } |
| 539 | if res.Exception != nil || res.JavaException != nil { |
| 540 | // This shouldn't happen, but let's be defensive. |
| 541 | return &CallError{ |
| 542 | Detail: "service bridge returned exception", |
| 543 | Code: int32(remotepb.RpcError_UNKNOWN), |
| 544 | } |
| 545 | } |
| 546 | return proto.Unmarshal(res.Response, out) |
| 547 | } |
| 548 | |
| 549 | func (c *context) Request() *http.Request { |
| 550 | return c.req |
| 551 | } |
| 552 | |
| 553 | func (c *context) addLogLine(ll *logpb.UserAppLogLine) { |
| 554 | // Truncate long log lines. |
| 555 | // TODO(dsymonds): Check if this is still necessary. |
| 556 | const lim = 8 << 10 |
| 557 | if len(*ll.Message) > lim { |
| 558 | suffix := fmt.Sprintf("...(length %d)", len(*ll.Message)) |
| 559 | ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix) |
| 560 | } |
| 561 | |
| 562 | c.pendingLogs.Lock() |
| 563 | c.pendingLogs.lines = append(c.pendingLogs.lines, ll) |
| 564 | c.pendingLogs.Unlock() |
| 565 | } |
| 566 | |
| 567 | var logLevelName = map[int64]string{ |
| 568 | 0: "DEBUG", |
| 569 | 1: "INFO", |
| 570 | 2: "WARNING", |
| 571 | 3: "ERROR", |
| 572 | 4: "CRITICAL", |
| 573 | } |
| 574 | |
| 575 | func logf(c *context, level int64, format string, args ...interface{}) { |
| 576 | if c == nil { |
| 577 | panic("not an App Engine context") |
| 578 | } |
| 579 | s := fmt.Sprintf(format, args...) |
| 580 | s = strings.TrimRight(s, "\n") // Remove any trailing newline characters. |
| 581 | c.addLogLine(&logpb.UserAppLogLine{ |
| 582 | TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3), |
| 583 | Level: &level, |
| 584 | Message: &s, |
| 585 | }) |
| 586 | // Only duplicate log to stderr if not running on App Engine second generation |
| 587 | if !IsSecondGen() { |
| 588 | log.Print(logLevelName[level] + ": " + s) |
| 589 | } |
| 590 | } |
| 591 | |
| 592 | // flushLog attempts to flush any pending logs to the appserver. |
| 593 | // It should not be called concurrently. |
| 594 | func (c *context) flushLog(force bool) (flushed bool) { |
| 595 | c.pendingLogs.Lock() |
| 596 | // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious. |
| 597 | n, rem := 0, 30<<20 |
| 598 | for ; n < len(c.pendingLogs.lines); n++ { |
| 599 | ll := c.pendingLogs.lines[n] |
| 600 | // Each log line will require about 3 bytes of overhead. |
| 601 | nb := proto.Size(ll) + 3 |
| 602 | if nb > rem { |
| 603 | break |
| 604 | } |
| 605 | rem -= nb |
| 606 | } |
| 607 | lines := c.pendingLogs.lines[:n] |
| 608 | c.pendingLogs.lines = c.pendingLogs.lines[n:] |
| 609 | c.pendingLogs.Unlock() |
| 610 | |
| 611 | if len(lines) == 0 && !force { |
| 612 | // Nothing to flush. |
| 613 | return false |
| 614 | } |
| 615 | |
| 616 | rescueLogs := false |
| 617 | defer func() { |
| 618 | if rescueLogs { |
| 619 | c.pendingLogs.Lock() |
| 620 | c.pendingLogs.lines = append(lines, c.pendingLogs.lines...) |
| 621 | c.pendingLogs.Unlock() |
| 622 | } |
| 623 | }() |
| 624 | |
| 625 | buf, err := proto.Marshal(&logpb.UserAppLogGroup{ |
| 626 | LogLine: lines, |
| 627 | }) |
| 628 | if err != nil { |
| 629 | log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err) |
| 630 | rescueLogs = true |
| 631 | return false |
| 632 | } |
| 633 | |
| 634 | req := &logpb.FlushRequest{ |
| 635 | Logs: buf, |
| 636 | } |
| 637 | res := &basepb.VoidProto{} |
| 638 | c.pendingLogs.Lock() |
| 639 | c.pendingLogs.flushes++ |
| 640 | c.pendingLogs.Unlock() |
| 641 | if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil { |
| 642 | log.Printf("internal.flushLog: Flush RPC: %v", err) |
| 643 | rescueLogs = true |
| 644 | return false |
| 645 | } |
| 646 | return true |
| 647 | } |
| 648 | |
| 649 | const ( |
| 650 | // Log flushing parameters. |
| 651 | flushInterval = 1 * time.Second |
| 652 | forceFlushInterval = 60 * time.Second |
| 653 | ) |
| 654 | |
| 655 | func (c *context) logFlusher(stop <-chan int) { |
| 656 | lastFlush := time.Now() |
| 657 | tick := time.NewTicker(flushInterval) |
| 658 | for { |
| 659 | select { |
| 660 | case <-stop: |
| 661 | // Request finished. |
| 662 | tick.Stop() |
| 663 | return |
| 664 | case <-tick.C: |
| 665 | force := time.Now().Sub(lastFlush) > forceFlushInterval |
| 666 | if c.flushLog(force) { |
| 667 | lastFlush = time.Now() |
| 668 | } |
| 669 | } |
| 670 | } |
| 671 | } |
| 672 | |
| 673 | func ContextForTesting(req *http.Request) netcontext.Context { |
| 674 | return toContext(&context{req: req}) |
| 675 | } |