| // Copyright 2014 Google Inc. All rights reserved. |
| // Use of this source code is governed by the Apache 2.0 |
| // license that can be found in the LICENSE file. |
| |
| package internal |
| |
| // This file implements hooks for applying datastore transactions. |
| |
| import ( |
| "errors" |
| "reflect" |
| |
| "github.com/golang/protobuf/proto" |
| netcontext "golang.org/x/net/context" |
| |
| basepb "google.golang.org/appengine/internal/base" |
| pb "google.golang.org/appengine/internal/datastore" |
| ) |
| |
| var transactionSetters = make(map[reflect.Type]reflect.Value) |
| |
| // RegisterTransactionSetter registers a function that sets transaction information |
| // in a protocol buffer message. f should be a function with two arguments, |
| // the first being a protocol buffer type, and the second being *datastore.Transaction. |
| func RegisterTransactionSetter(f interface{}) { |
| v := reflect.ValueOf(f) |
| transactionSetters[v.Type().In(0)] = v |
| } |
| |
| // applyTransaction applies the transaction t to message pb |
| // by using the relevant setter passed to RegisterTransactionSetter. |
| func applyTransaction(pb proto.Message, t *pb.Transaction) { |
| v := reflect.ValueOf(pb) |
| if f, ok := transactionSetters[v.Type()]; ok { |
| f.Call([]reflect.Value{v, reflect.ValueOf(t)}) |
| } |
| } |
| |
| var transactionKey = "used for *Transaction" |
| |
| func transactionFromContext(ctx netcontext.Context) *transaction { |
| t, _ := ctx.Value(&transactionKey).(*transaction) |
| return t |
| } |
| |
| func withTransaction(ctx netcontext.Context, t *transaction) netcontext.Context { |
| return netcontext.WithValue(ctx, &transactionKey, t) |
| } |
| |
| type transaction struct { |
| transaction pb.Transaction |
| finished bool |
| } |
| |
| var ErrConcurrentTransaction = errors.New("internal: concurrent transaction") |
| |
| func RunTransactionOnce(c netcontext.Context, f func(netcontext.Context) error, xg bool, readOnly bool, previousTransaction *pb.Transaction) (*pb.Transaction, error) { |
| if transactionFromContext(c) != nil { |
| return nil, errors.New("nested transactions are not supported") |
| } |
| |
| // Begin the transaction. |
| t := &transaction{} |
| req := &pb.BeginTransactionRequest{ |
| App: proto.String(FullyQualifiedAppID(c)), |
| } |
| if xg { |
| req.AllowMultipleEg = proto.Bool(true) |
| } |
| if previousTransaction != nil { |
| req.PreviousTransaction = previousTransaction |
| } |
| if readOnly { |
| req.Mode = pb.BeginTransactionRequest_READ_ONLY.Enum() |
| } else { |
| req.Mode = pb.BeginTransactionRequest_READ_WRITE.Enum() |
| } |
| if err := Call(c, "datastore_v3", "BeginTransaction", req, &t.transaction); err != nil { |
| return nil, err |
| } |
| |
| // Call f, rolling back the transaction if f returns a non-nil error, or panics. |
| // The panic is not recovered. |
| defer func() { |
| if t.finished { |
| return |
| } |
| t.finished = true |
| // Ignore the error return value, since we are already returning a non-nil |
| // error (or we're panicking). |
| Call(c, "datastore_v3", "Rollback", &t.transaction, &basepb.VoidProto{}) |
| }() |
| if err := f(withTransaction(c, t)); err != nil { |
| return &t.transaction, err |
| } |
| t.finished = true |
| |
| // Commit the transaction. |
| res := &pb.CommitResponse{} |
| err := Call(c, "datastore_v3", "Commit", &t.transaction, res) |
| if ae, ok := err.(*APIError); ok { |
| /* TODO: restore this conditional |
| if appengine.IsDevAppServer() { |
| */ |
| // The Python Dev AppServer raises an ApplicationError with error code 2 (which is |
| // Error.CONCURRENT_TRANSACTION) and message "Concurrency exception.". |
| if ae.Code == int32(pb.Error_BAD_REQUEST) && ae.Detail == "ApplicationError: 2 Concurrency exception." { |
| return &t.transaction, ErrConcurrentTransaction |
| } |
| if ae.Code == int32(pb.Error_CONCURRENT_TRANSACTION) { |
| return &t.transaction, ErrConcurrentTransaction |
| } |
| } |
| return &t.transaction, err |
| } |