khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | package concurrent |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "fmt" |
| 6 | "runtime" |
| 7 | "runtime/debug" |
| 8 | "sync" |
| 9 | "time" |
| 10 | "reflect" |
| 11 | ) |
| 12 | |
| 13 | // HandlePanic logs goroutine panic by default |
| 14 | var HandlePanic = func(recovered interface{}, funcName string) { |
| 15 | ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered)) |
| 16 | ErrorLogger.Println(string(debug.Stack())) |
| 17 | } |
| 18 | |
| 19 | // UnboundedExecutor is a executor without limits on counts of alive goroutines |
| 20 | // it tracks the goroutine started by it, and can cancel them when shutdown |
| 21 | type UnboundedExecutor struct { |
| 22 | ctx context.Context |
| 23 | cancel context.CancelFunc |
| 24 | activeGoroutinesMutex *sync.Mutex |
| 25 | activeGoroutines map[string]int |
| 26 | HandlePanic func(recovered interface{}, funcName string) |
| 27 | } |
| 28 | |
| 29 | // GlobalUnboundedExecutor has the life cycle of the program itself |
| 30 | // any goroutine want to be shutdown before main exit can be started from this executor |
| 31 | // GlobalUnboundedExecutor expects the main function to call stop |
| 32 | // it does not magically knows the main function exits |
| 33 | var GlobalUnboundedExecutor = NewUnboundedExecutor() |
| 34 | |
| 35 | // NewUnboundedExecutor creates a new UnboundedExecutor, |
| 36 | // UnboundedExecutor can not be created by &UnboundedExecutor{} |
| 37 | // HandlePanic can be set with a callback to override global HandlePanic |
| 38 | func NewUnboundedExecutor() *UnboundedExecutor { |
| 39 | ctx, cancel := context.WithCancel(context.TODO()) |
| 40 | return &UnboundedExecutor{ |
| 41 | ctx: ctx, |
| 42 | cancel: cancel, |
| 43 | activeGoroutinesMutex: &sync.Mutex{}, |
| 44 | activeGoroutines: map[string]int{}, |
| 45 | } |
| 46 | } |
| 47 | |
| 48 | // Go starts a new goroutine and tracks its lifecycle. |
| 49 | // Panic will be recovered and logged automatically, except for StopSignal |
| 50 | func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) { |
| 51 | pc := reflect.ValueOf(handler).Pointer() |
| 52 | f := runtime.FuncForPC(pc) |
| 53 | funcName := f.Name() |
| 54 | file, line := f.FileLine(pc) |
| 55 | executor.activeGoroutinesMutex.Lock() |
| 56 | defer executor.activeGoroutinesMutex.Unlock() |
| 57 | startFrom := fmt.Sprintf("%s:%d", file, line) |
| 58 | executor.activeGoroutines[startFrom] += 1 |
| 59 | go func() { |
| 60 | defer func() { |
| 61 | recovered := recover() |
| 62 | // if you want to quit a goroutine without trigger HandlePanic |
| 63 | // use runtime.Goexit() to quit |
| 64 | if recovered != nil { |
| 65 | if executor.HandlePanic == nil { |
| 66 | HandlePanic(recovered, funcName) |
| 67 | } else { |
| 68 | executor.HandlePanic(recovered, funcName) |
| 69 | } |
| 70 | } |
| 71 | executor.activeGoroutinesMutex.Lock() |
| 72 | executor.activeGoroutines[startFrom] -= 1 |
| 73 | executor.activeGoroutinesMutex.Unlock() |
| 74 | }() |
| 75 | handler(executor.ctx) |
| 76 | }() |
| 77 | } |
| 78 | |
| 79 | // Stop cancel all goroutines started by this executor without wait |
| 80 | func (executor *UnboundedExecutor) Stop() { |
| 81 | executor.cancel() |
| 82 | } |
| 83 | |
| 84 | // StopAndWaitForever cancel all goroutines started by this executor and |
| 85 | // wait until all goroutines exited |
| 86 | func (executor *UnboundedExecutor) StopAndWaitForever() { |
| 87 | executor.StopAndWait(context.Background()) |
| 88 | } |
| 89 | |
| 90 | // StopAndWait cancel all goroutines started by this executor and wait. |
| 91 | // Wait can be cancelled by the context passed in. |
| 92 | func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) { |
| 93 | executor.cancel() |
| 94 | for { |
| 95 | oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100) |
| 96 | select { |
| 97 | case <-oneHundredMilliseconds.C: |
| 98 | if executor.checkNoActiveGoroutines() { |
| 99 | return |
| 100 | } |
| 101 | case <-ctx.Done(): |
| 102 | return |
| 103 | } |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool { |
| 108 | executor.activeGoroutinesMutex.Lock() |
| 109 | defer executor.activeGoroutinesMutex.Unlock() |
| 110 | for startFrom, count := range executor.activeGoroutines { |
| 111 | if count > 0 { |
| 112 | InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit", |
| 113 | "startFrom", startFrom, |
| 114 | "count", count) |
| 115 | return false |
| 116 | } |
| 117 | } |
| 118 | return true |
| 119 | } |