blob: 9431c101e338fa98014a8412d9fe6ed83a0dd508 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001package probing
2
3import (
4 "encoding/json"
5 "errors"
6 "fmt"
7 "net/http"
8 "sync"
9 "time"
10)
11
12var (
13 ErrNotFound = errors.New("probing: id not found")
14 ErrExist = errors.New("probing: id exists")
15)
16
17type Prober interface {
18 AddHTTP(id string, probingInterval time.Duration, endpoints []string) error
19 Remove(id string) error
20 RemoveAll()
21 Reset(id string) error
22 Status(id string) (Status, error)
23}
24
25type prober struct {
26 mu sync.Mutex
27 targets map[string]*status
28 tr http.RoundTripper
29}
30
31func NewProber(tr http.RoundTripper) Prober {
32 p := &prober{targets: make(map[string]*status)}
33 if tr == nil {
34 p.tr = http.DefaultTransport
35 } else {
36 p.tr = tr
37 }
38 return p
39}
40
41func (p *prober) AddHTTP(id string, probingInterval time.Duration, endpoints []string) error {
42 p.mu.Lock()
43 defer p.mu.Unlock()
44 if _, ok := p.targets[id]; ok {
45 return ErrExist
46 }
47
48 s := &status{stopC: make(chan struct{})}
49 p.targets[id] = s
50
51 ticker := time.NewTicker(probingInterval)
52
53 go func() {
54 pinned := 0
55 for {
56 select {
57 case <-ticker.C:
58 start := time.Now()
59 req, err := http.NewRequest("GET", endpoints[pinned], nil)
60 if err != nil {
61 panic(err)
62 }
63 resp, err := p.tr.RoundTrip(req)
64 if err == nil && resp.StatusCode != http.StatusOK {
65 err = fmt.Errorf("got unexpected HTTP status code %s from %s", resp.Status, endpoints[pinned])
66 resp.Body.Close()
67 }
68 if err != nil {
69 s.recordFailure(err)
70 pinned = (pinned + 1) % len(endpoints)
71 continue
72 }
73
74 var hh Health
75 d := json.NewDecoder(resp.Body)
76 err = d.Decode(&hh)
77 resp.Body.Close()
78 if err != nil || !hh.OK {
79 s.recordFailure(err)
80 pinned = (pinned + 1) % len(endpoints)
81 continue
82 }
83
84 s.record(time.Since(start), hh.Now)
85 case <-s.stopC:
86 ticker.Stop()
87 return
88 }
89 }
90 }()
91
92 return nil
93}
94
95func (p *prober) Remove(id string) error {
96 p.mu.Lock()
97 defer p.mu.Unlock()
98
99 s, ok := p.targets[id]
100 if !ok {
101 return ErrNotFound
102 }
103 close(s.stopC)
104 delete(p.targets, id)
105 return nil
106}
107
108func (p *prober) RemoveAll() {
109 p.mu.Lock()
110 defer p.mu.Unlock()
111
112 for _, s := range p.targets {
113 close(s.stopC)
114 }
115 p.targets = make(map[string]*status)
116}
117
118func (p *prober) Reset(id string) error {
119 p.mu.Lock()
120 defer p.mu.Unlock()
121
122 s, ok := p.targets[id]
123 if !ok {
124 return ErrNotFound
125 }
126 s.reset()
127 return nil
128}
129
130func (p *prober) Status(id string) (Status, error) {
131 p.mu.Lock()
132 defer p.mu.Unlock()
133
134 s, ok := p.targets[id]
135 if !ok {
136 return nil, ErrNotFound
137 }
138 return s, nil
139}