blob: c917cfd9d19ac5e9a82317ed400c2898726feac6 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001package probing
2
3import (
4 "encoding/json"
5 "errors"
6 "net/http"
7 "sync"
8 "time"
9)
10
11var (
12 ErrNotFound = errors.New("probing: id not found")
13 ErrExist = errors.New("probing: id exists")
14)
15
16type Prober interface {
17 AddHTTP(id string, probingInterval time.Duration, endpoints []string) error
18 Remove(id string) error
19 RemoveAll()
20 Reset(id string) error
21 Status(id string) (Status, error)
22}
23
24type prober struct {
25 mu sync.Mutex
26 targets map[string]*status
27 tr http.RoundTripper
28}
29
30func NewProber(tr http.RoundTripper) Prober {
31 p := &prober{targets: make(map[string]*status)}
32 if tr == nil {
33 p.tr = http.DefaultTransport
34 } else {
35 p.tr = tr
36 }
37 return p
38}
39
40func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error {
41 p.mu.Lock()
42 defer p.mu.Unlock()
43 if _, ok := p.targets[id]; ok {
44 return ErrExist
45 }
46
47 s := &status{stopC: make(chan struct{})}
48 p.targets[id] = s
49
50 ticker := time.NewTicker(probingInterval)
51
52 go func() {
53 pinned := 0
54 for {
55 select {
56 case <-ticker.C:
57 start := time.Now()
58 req, err := http.NewRequest("GET", endpoints[pinned], nil)
59 if err != nil {
60 panic(err)
61 }
62 resp, err := p.tr.RoundTrip(req)
63 if err != nil {
64 s.recordFailure(err)
65 pinned = (pinned + 1) % len(endpoints)
66 continue
67 }
68
69 var hh Health
70 d := json.NewDecoder(resp.Body)
71 err = d.Decode(&hh)
72 resp.Body.Close()
73 if err != nil || !hh.OK {
74 s.recordFailure(err)
75 pinned = (pinned + 1) % len(endpoints)
76 continue
77 }
78
79 s.record(time.Since(start), hh.Now)
80 case <-s.stopC:
81 ticker.Stop()
82 return
83 }
84 }
85 }()
86
87 return nil
88}
89
90func (p *prober) Remove(id string) error {
91 p.mu.Lock()
92 defer p.mu.Unlock()
93
94 s, ok := p.targets[id]
95 if !ok {
96 return ErrNotFound
97 }
98 close(s.stopC)
99 delete(p.targets, id)
100 return nil
101}
102
103func (p *prober) RemoveAll() {
104 p.mu.Lock()
105 defer p.mu.Unlock()
106
107 for _, s := range p.targets {
108 close(s.stopC)
109 }
110 p.targets = make(map[string]*status)
111}
112
113func (p *prober) Reset(id string) error {
114 p.mu.Lock()
115 defer p.mu.Unlock()
116
117 s, ok := p.targets[id]
118 if !ok {
119 return ErrNotFound
120 }
121 s.reset()
122 return nil
123}
124
125func (p *prober) Status(id string) (Status, error) {
126 p.mu.Lock()
127 defer p.mu.Unlock()
128
129 s, ok := p.targets[id]
130 if !ok {
131 return nil, ErrNotFound
132 }
133 return s, nil
134}