blob: 05a77dceb1e23e95882e18ed707fb682fbdeec44 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001package concurrent
2
3import (
4 "context"
5 "fmt"
6 "runtime"
7 "runtime/debug"
8 "sync"
9 "time"
10 "reflect"
11)
12
13// HandlePanic logs goroutine panic by default
14var HandlePanic = func(recovered interface{}, funcName string) {
15 ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
16 ErrorLogger.Println(string(debug.Stack()))
17}
18
19// UnboundedExecutor is a executor without limits on counts of alive goroutines
20// it tracks the goroutine started by it, and can cancel them when shutdown
21type UnboundedExecutor struct {
22 ctx context.Context
23 cancel context.CancelFunc
24 activeGoroutinesMutex *sync.Mutex
25 activeGoroutines map[string]int
26 HandlePanic func(recovered interface{}, funcName string)
27}
28
29// GlobalUnboundedExecutor has the life cycle of the program itself
30// any goroutine want to be shutdown before main exit can be started from this executor
31// GlobalUnboundedExecutor expects the main function to call stop
32// it does not magically knows the main function exits
33var GlobalUnboundedExecutor = NewUnboundedExecutor()
34
35// NewUnboundedExecutor creates a new UnboundedExecutor,
36// UnboundedExecutor can not be created by &UnboundedExecutor{}
37// HandlePanic can be set with a callback to override global HandlePanic
38func NewUnboundedExecutor() *UnboundedExecutor {
39 ctx, cancel := context.WithCancel(context.TODO())
40 return &UnboundedExecutor{
41 ctx: ctx,
42 cancel: cancel,
43 activeGoroutinesMutex: &sync.Mutex{},
44 activeGoroutines: map[string]int{},
45 }
46}
47
48// Go starts a new goroutine and tracks its lifecycle.
49// Panic will be recovered and logged automatically, except for StopSignal
50func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
51 pc := reflect.ValueOf(handler).Pointer()
52 f := runtime.FuncForPC(pc)
53 funcName := f.Name()
54 file, line := f.FileLine(pc)
55 executor.activeGoroutinesMutex.Lock()
56 defer executor.activeGoroutinesMutex.Unlock()
57 startFrom := fmt.Sprintf("%s:%d", file, line)
58 executor.activeGoroutines[startFrom] += 1
59 go func() {
60 defer func() {
61 recovered := recover()
62 // if you want to quit a goroutine without trigger HandlePanic
63 // use runtime.Goexit() to quit
64 if recovered != nil {
65 if executor.HandlePanic == nil {
66 HandlePanic(recovered, funcName)
67 } else {
68 executor.HandlePanic(recovered, funcName)
69 }
70 }
71 executor.activeGoroutinesMutex.Lock()
72 executor.activeGoroutines[startFrom] -= 1
73 executor.activeGoroutinesMutex.Unlock()
74 }()
75 handler(executor.ctx)
76 }()
77}
78
79// Stop cancel all goroutines started by this executor without wait
80func (executor *UnboundedExecutor) Stop() {
81 executor.cancel()
82}
83
84// StopAndWaitForever cancel all goroutines started by this executor and
85// wait until all goroutines exited
86func (executor *UnboundedExecutor) StopAndWaitForever() {
87 executor.StopAndWait(context.Background())
88}
89
90// StopAndWait cancel all goroutines started by this executor and wait.
91// Wait can be cancelled by the context passed in.
92func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
93 executor.cancel()
94 for {
95 oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
96 select {
97 case <-oneHundredMilliseconds.C:
98 if executor.checkNoActiveGoroutines() {
99 return
100 }
101 case <-ctx.Done():
102 return
103 }
104 }
105}
106
107func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
108 executor.activeGoroutinesMutex.Lock()
109 defer executor.activeGoroutinesMutex.Unlock()
110 for startFrom, count := range executor.activeGoroutines {
111 if count > 0 {
112 InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
113 "startFrom", startFrom,
114 "count", count)
115 return false
116 }
117 }
118 return true
119}