blob: a452e7e951f617b346c81dc0094784084dec9afd [file] [log] [blame]
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v2store
import (
"container/list"
"path"
"strings"
"sync"
"sync/atomic"
"go.etcd.io/etcd/etcdserver/api/v2error"
)
// A watcherHub contains all subscribed watchers
// watchers is a map with watched path as key and watcher as value
// EventHistory keeps the old events for watcherHub. It is used to help
// watcher to get a continuous event history. Or a watcher might miss the
// event happens between the end of the first watch command and the start
// of the second command.
type watcherHub struct {
// count must be the first element to keep 64-bit alignment for atomic
// access
count int64 // current number of watchers.
mutex sync.Mutex
watchers map[string]*list.List
EventHistory *EventHistory
}
// newWatchHub creates a watcherHub. The capacity determines how many events we will
// keep in the eventHistory.
// Typically, we only need to keep a small size of history[smaller than 20K].
// Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
func newWatchHub(capacity int) *watcherHub {
return &watcherHub{
watchers: make(map[string]*list.List),
EventHistory: newEventHistory(capacity),
}
}
// Watch function returns a Watcher.
// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
// If index is zero, watch will start from the current index + 1.
func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *v2error.Error) {
reportWatchRequest()
event, err := wh.EventHistory.scan(key, recursive, index)
if err != nil {
err.Index = storeIndex
return nil, err
}
w := &watcher{
eventChan: make(chan *Event, 100), // use a buffered channel
recursive: recursive,
stream: stream,
sinceIndex: index,
startIndex: storeIndex,
hub: wh,
}
wh.mutex.Lock()
defer wh.mutex.Unlock()
// If the event exists in the known history, append the EtcdIndex and return immediately
if event != nil {
ne := event.Clone()
ne.EtcdIndex = storeIndex
w.eventChan <- ne
return w, nil
}
l, ok := wh.watchers[key]
var elem *list.Element
if ok { // add the new watcher to the back of the list
elem = l.PushBack(w)
} else { // create a new list and add the new watcher
l = list.New()
elem = l.PushBack(w)
wh.watchers[key] = l
}
w.remove = func() {
if w.removed { // avoid removing it twice
return
}
w.removed = true
l.Remove(elem)
atomic.AddInt64(&wh.count, -1)
reportWatcherRemoved()
if l.Len() == 0 {
delete(wh.watchers, key)
}
}
atomic.AddInt64(&wh.count, 1)
reportWatcherAdded()
return w, nil
}
func (wh *watcherHub) add(e *Event) {
wh.EventHistory.addEvent(e)
}
// notify function accepts an event and notify to the watchers.
func (wh *watcherHub) notify(e *Event) {
e = wh.EventHistory.addEvent(e) // add event into the eventHistory
segments := strings.Split(e.Node.Key, "/")
currPath := "/"
// walk through all the segments of the path and notify the watchers
// if the path is "/foo/bar", it will notify watchers with path "/",
// "/foo" and "/foo/bar"
for _, segment := range segments {
currPath = path.Join(currPath, segment)
// notify the watchers who interests in the changes of current path
wh.notifyWatchers(e, currPath, false)
}
}
func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
wh.mutex.Lock()
defer wh.mutex.Unlock()
l, ok := wh.watchers[nodePath]
if ok {
curr := l.Front()
for curr != nil {
next := curr.Next() // save reference to the next one in the list
w, _ := curr.Value.(*watcher)
originalPath := e.Node.Key == nodePath
if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
if !w.stream { // do not remove the stream watcher
// if we successfully notify a watcher
// we need to remove the watcher from the list
// and decrease the counter
w.removed = true
l.Remove(curr)
atomic.AddInt64(&wh.count, -1)
reportWatcherRemoved()
}
}
curr = next // update current to the next element in the list
}
if l.Len() == 0 {
// if we have notified all watcher in the list
// we can delete the list
delete(wh.watchers, nodePath)
}
}
}
// clone function clones the watcherHub and return the cloned one.
// only clone the static content. do not clone the current watchers.
func (wh *watcherHub) clone() *watcherHub {
clonedHistory := wh.EventHistory.clone()
return &watcherHub{
EventHistory: clonedHistory,
}
}
// isHidden checks to see if key path is considered hidden to watch path i.e. the
// last element is hidden or it's within a hidden directory
func isHidden(watchPath, keyPath string) bool {
// When deleting a directory, watchPath might be deeper than the actual keyPath
// For example, when deleting /foo we also need to notify watchers on /foo/bar.
if len(watchPath) > len(keyPath) {
return false
}
// if watch path is just a "/", after path will start without "/"
// add a "/" to deal with the special case when watchPath is "/"
afterPath := path.Clean("/" + keyPath[len(watchPath):])
return strings.Contains(afterPath, "/_")
}