First voltha-go commit. This commit is focussed on setting up the voltha-go structure as well as the kvstore library
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7182935
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,40 @@
+# PyCharm
+.idea
+exportToHTML
+
+# Emacs
+*~
+.#*
+
+# Vagrant
+.vagrant
+*.box
+
+# Ansible
+ansible/*.retry
+
+# Any vi swap files
+*.swp
+
+# Protobuf output files
+**/*_pb2.py
+**/*_pb2_grpc.py
+
+# Editors
+*.bak
+*.project
+*.pydevproject
+
+# Docker
+.docker-base-built
+
+# Mac stuff
+.DS_Store
+**/.DS_Store
+
+# Generated docs
+**/*.pdf
+
+# Vagrant logfile
+*.log
+
diff --git a/BUILD.md b/BUILD.md
new file mode 100644
index 0000000..749777d
--- /dev/null
+++ b/BUILD.md
@@ -0,0 +1,3 @@
+# How to Build VOLTHA
+
+TODO
\ No newline at end of file
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..840e855
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,94 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ifeq ($(TAG),)
+TAG := latest
+endif
+
+ifeq ($(TARGET_TAG),)
+TARGET_TAG := latest
+endif
+
+# If no DOCKER_HOST_IP is specified grab a v4 IP address associated with
+# the default gateway
+ifeq ($(DOCKER_HOST_IP),)
+DOCKER_HOST_IP := $(shell ifconfig $$(netstat -rn | grep -E '^(default|0.0.0.0)' | head -1 | awk '{print $$NF}') | grep inet | awk '{print $$2}' | sed -e 's/addr://g')
+endif
+
+
+ifneq ($(http_proxy)$(https_proxy),)
+# Include proxies from the environment
+DOCKER_PROXY_ARGS = \
+ --build-arg http_proxy=$(http_proxy) \
+ --build-arg https_proxy=$(https_proxy) \
+ --build-arg ftp_proxy=$(ftp_proxy) \
+ --build-arg no_proxy=$(no_proxy) \
+ --build-arg HTTP_PROXY=$(HTTP_PROXY) \
+ --build-arg HTTPS_PROXY=$(HTTPS_PROXY) \
+ --build-arg FTP_PROXY=$(FTP_PROXY) \
+ --build-arg NO_PROXY=$(NO_PROXY)
+endif
+
+DOCKER_BUILD_ARGS = \
+ --build-arg TAG=$(TAG) \
+ --build-arg REGISTRY=$(REGISTRY) \
+ --build-arg REPOSITORY=$(REPOSITORY) \
+ $(DOCKER_PROXY_ARGS) $(DOCKER_CACHE_ARG) \
+ --rm --force-rm \
+ $(DOCKER_BUILD_EXTRA_ARGS)
+
+DOCKER_IMAGE_LIST = \
+ rw_core
+
+
+.PHONY: $(DIRS) $(DIRS_CLEAN) $(DIRS_FLAKE8) rw_core ro_core
+
+# This should to be the first and default target in this Makefile
+help:
+ @echo "Usage: make [<target>]"
+ @echo "where available targets are:"
+ @echo
+ @echo "build : Build the docker images.\n\
+ If this is the first time you are building, choose \"make build\" option."
+ @echo "rw_core : Build the rw_core docker container"
+ @echo
+
+
+# Parallel Build
+$(DIRS):
+ @echo " MK $@"
+ $(Q)$(MAKE) -C $@
+
+# Parallel Clean
+DIRS_CLEAN = $(addsuffix .clean,$(DIRS))
+$(DIRS_CLEAN):
+ @echo " CLEAN $(basename $@)"
+ $(Q)$(MAKE) -C $(basename $@) clean
+
+build: containers
+
+containers: rw_core
+
+ifneq ($(VOLTHA_BUILD),docker)
+rw_core:
+ docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-rw-core:${TAG} -f docker/Dockerfile.rw_core .
+else
+rw_core:
+ docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-rw-core:${TAG} -f docker/Dockerfile.rw_core_d .
+endif
+
+
+# end file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..f0e9383
--- /dev/null
+++ b/README.md
@@ -0,0 +1,30 @@
+# VOLTHA
+
+## What is Voltha?
+
+Voltha aims to provide a layer of abstraction on top of legacy and next generation access network equipment for the purpose of control and management. Its initial focus is on PON (GPON, EPON, NG PON 2), but it aims to go beyond to eventually cover other access technologies (xDSL, Docsis, G.FAST, dedicated Ethernet, fixed wireless).
+
+Key concepts of Voltha:
+
+* **Network as a Switch**: It makes a set of connected access network devices to look like a(n abstract) programmable flow device, a L2/L3/L4 switch. Examples:
+ * PON as a Switch
+ * PON + access backhaul as a Switch
+ * xDSL service as a Switch
+* **Evolution to virtualization**: it can work with a variety of (access) network technologies and devices, including legacy, fully virtualized (in the sense of separation of hardware and software), and in between. Voltha can run on a decice, on general purpose servers in the central office, or in data centers.
+* **Unified OAM abstraction**: it provides unified, vendor- and technology agnostic handling of device management tasks, such as service lifecycle, device lifecycle (including discovery, upgrade), system monitoring, alarms, troubleshooting, security, etc.
+* **Cloud/DevOps bridge to modernization**: it does all above while also treating the abstracted network functions as software services manageable much like other software components in the cloud, i.e., containers.
+
+## Why Voltha?
+
+Control and management in the access network space is a mess. Each access technology brings its own bag of protocols, and on top of that vendors have their own interpretation/extension of the same standards. Compounding the problem is that these vendor- and technology specific differences ooze way up into the centralized OSS systems of the service provider, creating a lot of inefficiencies.
+
+Ideally, all vendor equipment for the same access technology should provide an identical interface for control and management. Moreover, there shall be much higher synergies across technologies. While we wait for vendors to unite, Voltha provides an increment to that direction, by confining the differences to the locality of access and hiding them from the upper layers of the OSS stack.
+
+
+## How can you work with Voltha?
+
+While we are still at the early phase of development, you can check out the [BUILD.md](BUILD.md) file to see how you can build it, run it, test it, etc.
+
+## How can you help?
+
+Contributions, small and large, are welcome. Minor contributions and bug fixes are always welcome in form of pull requests. For larger work, the best is to check in with the existing developers to see where help is most needed and to make sure your solution is compatible with the general philosophy of Voltha.
\ No newline at end of file
diff --git a/common/log/log.go b/common/log/log.go
new file mode 100644
index 0000000..4bb31c0
--- /dev/null
+++ b/common/log/log.go
@@ -0,0 +1,430 @@
+package log
+
+import (
+ "errors"
+ zp "go.uber.org/zap"
+ zc "go.uber.org/zap/zapcore"
+ "runtime"
+ "strings"
+ "fmt"
+)
+
+const (
+ // DebugLevel logs a message at debug level
+ DebugLevel = iota
+ // InfoLevel logs a message at info level
+ InfoLevel
+ // WarnLevel logs a message at warning level
+ WarnLevel
+ // ErrorLevel logs a message at error level
+ ErrorLevel
+ // PanicLevel logs a message, then panics.
+ PanicLevel
+ // FatalLevel logs a message, then calls os.Exit(1).
+ FatalLevel
+)
+
+// CONSOLE formats the log for the console, mostly used during development
+const CONSOLE = "console"
+// JSON formats the log using json format, mostly used by an automated logging system consumption
+const JSON = "json"
+
+// Logger represents an abstract logging interface. Any logging implementation used
+// will need to abide by this interface
+type Logger interface {
+ Debug(...interface{})
+ Debugln(...interface{})
+ Debugf(string, ...interface{})
+ Debugw(string, Fields)
+
+ Info(...interface{})
+ Infoln(...interface{})
+ Infof(string, ...interface{})
+ Infow(string, Fields)
+
+ Warn(...interface{})
+ Warnln(...interface{})
+ Warnf(string, ...interface{})
+ Warnw(string, Fields)
+
+ Error(...interface{})
+ Errorln(...interface{})
+ Errorf(string, ...interface{})
+ Errorw(string, Fields)
+
+ Fatal(...interface{})
+ Fatalln(...interface{})
+ Fatalf(string, ...interface{})
+ Fatalw(string, Fields)
+
+ With(Fields) Logger
+}
+
+// Fields is used as key-value pairs for structural logging
+type Fields map[string]interface{}
+
+var defaultLogger *logger
+
+type logger struct {
+ log *zp.SugaredLogger
+ parent *zp.Logger
+}
+
+func parseLevel(l int) zp.AtomicLevel {
+ switch l {
+ case DebugLevel:
+ return zp.NewAtomicLevelAt(zc.DebugLevel)
+ case InfoLevel:
+ return zp.NewAtomicLevelAt(zc.InfoLevel)
+ case WarnLevel:
+ return zp.NewAtomicLevelAt(zc.WarnLevel)
+ case ErrorLevel:
+ return zp.NewAtomicLevelAt(zc.ErrorLevel)
+ case PanicLevel:
+ return zp.NewAtomicLevelAt(zc.PanicLevel)
+ case FatalLevel:
+ return zp.NewAtomicLevelAt(zc.FatalLevel)
+ }
+ return zp.NewAtomicLevelAt(zc.ErrorLevel)
+}
+
+func getDefaultConfig(outputType string, level int, defaultFields Fields) zp.Config {
+ return zp.Config{
+ Level: parseLevel(level),
+ Encoding: outputType,
+ Development: true,
+ OutputPaths: []string{"stdout"},
+ ErrorOutputPaths: []string{"stderr"},
+ InitialFields: defaultFields,
+ EncoderConfig: zc.EncoderConfig{
+ LevelKey: "level",
+ MessageKey: "msg",
+ TimeKey: "ts",
+ StacktraceKey: "stacktrace",
+ LineEnding: zc.DefaultLineEnding,
+ EncodeLevel: zc.LowercaseLevelEncoder,
+ EncodeTime: zc.ISO8601TimeEncoder,
+ EncodeDuration: zc.SecondsDurationEncoder,
+ EncodeCaller: zc.ShortCallerEncoder,
+ },
+ }
+}
+
+// SetLogger needs to be invoked before the logger API can be invoked. This function
+// initialize the default logger (zap's sugaredlogger)
+func SetLogger(outputType string, level int, defaultFields Fields) (Logger, error) {
+
+ // Build a custom config using zap
+ cfg := getDefaultConfig(outputType, level, defaultFields)
+
+ l, err := cfg.Build()
+ if err != nil {
+ return nil, err
+ }
+
+ defaultLogger = &logger{
+ log: l.Sugar(),
+ parent: l,
+ }
+
+ return defaultLogger, nil
+}
+
+// CleanUp flushed any buffered log entries. Applications should take care to call
+// CleanUp before exiting.
+func CleanUp() error {
+ if defaultLogger != nil {
+ if defaultLogger.parent != nil {
+ if err := defaultLogger.parent.Sync(); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// GetLogger returned the default logger. If SetLogger was not previously invoked then
+// this method will return an error
+func GetLogger() (Logger, error) {
+ if defaultLogger == nil {
+ return nil, errors.New("Uninitialized-logger")
+ }
+ return defaultLogger, nil
+}
+
+func extractFileNameAndLineNumber(skipLevel int) (string, int) {
+ _, file, line, ok := runtime.Caller(skipLevel)
+ var key string
+ if !ok {
+ key = "<???>"
+ line = 1
+ } else {
+ slash := strings.LastIndex(file, "/")
+ key = file[slash+1:]
+ }
+ return key, line
+}
+
+// sourced adds a source field to the logger that contains
+// the file name and line where the logging happened.
+func (l *logger) sourced() *zp.SugaredLogger {
+ key, line := extractFileNameAndLineNumber(3)
+ if strings.HasSuffix(key, "log.go") || strings.HasSuffix(key, "proc.go") {
+ // Go to a lower level
+ key, line = extractFileNameAndLineNumber(2)
+ }
+ if !strings.HasSuffix(key, ".go") {
+ // Go to a higher level
+ key, line = extractFileNameAndLineNumber(4)
+ }
+
+ return l.log.With("caller", fmt.Sprintf("%s:%d", key, line))
+}
+
+//func serializeMap(fields Fields) []interface{} {
+// data := make([]interface{}, len(fields)*2+2)
+// i := 0
+// for k, v := range fields {
+// data[i] = k
+// data[i+1] = v
+// i = i + 2
+// }
+// key, line := extractFileNameAndLineNumber(3)
+// data[i] = "caller"
+// data[i+1] = fmt.Sprintf("%s:%d", key, line)
+//
+// return data
+//}
+
+func serializeMap(fields Fields) []interface{} {
+ data := make([]interface{}, len(fields)*2)
+ i := 0
+ for k, v := range fields {
+ data[i] = k
+ data[i+1] = v
+ i = i + 2
+ }
+ return data
+}
+
+// With returns a logger initialized with the key-value pairs
+func (l logger) With(keysAndValues Fields) Logger {
+ return logger{log: l.log.With(serializeMap(keysAndValues)...), parent: l.parent}
+}
+
+// Debug logs a message at level Debug on the standard logger.
+func (l logger) Debug(args ...interface{}) {
+ l.log.Debug(args...)
+}
+
+// Debugln logs a message at level Debug on the standard logger with a line feed. Default in any case.
+func (l logger) Debugln(args ...interface{}) {
+ l.log.Debug(args...)
+}
+
+// Debugw logs a message at level Debug on the standard logger.
+func (l logger) Debugf(format string, args ...interface{}) {
+ l.log.Debugf(format, args...)
+}
+
+// Debugw logs a message with some additional context. The variadic key-value
+// pairs are treated as they are in With.
+func (l logger) Debugw(msg string, keysAndValues Fields) {
+ l.log.Debugw(msg, serializeMap(keysAndValues)...)
+}
+
+// Info logs a message at level Info on the standard logger.
+func (l logger) Info(args ...interface{}) {
+ l.log.Info(args...)
+}
+
+// Infoln logs a message at level Info on the standard logger with a line feed. Default in any case.
+func (l logger) Infoln(args ...interface{}) {
+ l.log.Info(args...)
+ //msg := fmt.Sprintln(args...)
+ //l.sourced().Info(msg[:len(msg)-1])
+}
+
+// Infof logs a message at level Info on the standard logger.
+func (l logger) Infof(format string, args ...interface{}) {
+ l.log.Infof(format, args...)
+}
+
+// Infow logs a message with some additional context. The variadic key-value
+// pairs are treated as they are in With.
+func (l logger) Infow(msg string, keysAndValues Fields) {
+ l.log.Infow(msg, serializeMap(keysAndValues)...)
+}
+
+// Warn logs a message at level Warn on the standard logger.
+func (l logger) Warn(args ...interface{}) {
+ l.log.Warn(args...)
+}
+
+// Warnln logs a message at level Warn on the standard logger with a line feed. Default in any case.
+func (l logger) Warnln(args ...interface{}) {
+ l.log.Warn(args...)
+}
+
+// Warnf logs a message at level Warn on the standard logger.
+func (l logger) Warnf(format string, args ...interface{}) {
+ l.log.Warnf(format, args...)
+}
+
+// Warnw logs a message with some additional context. The variadic key-value
+// pairs are treated as they are in With.
+func (l logger) Warnw(msg string, keysAndValues Fields) {
+ l.log.Warnw(msg, serializeMap(keysAndValues)...)
+}
+
+// Error logs a message at level Error on the standard logger.
+func (l logger) Error(args ...interface{}) {
+ l.log.Error(args...)
+}
+
+// Errorln logs a message at level Error on the standard logger with a line feed. Default in any case.
+func (l logger) Errorln(args ...interface{}) {
+ l.log.Error(args...)
+}
+
+// Errorf logs a message at level Error on the standard logger.
+func (l logger) Errorf(format string, args ...interface{}) {
+ l.log.Errorf(format, args...)
+}
+
+// Errorw logs a message with some additional context. The variadic key-value
+// pairs are treated as they are in With.
+func (l logger) Errorw(msg string, keysAndValues Fields) {
+ l.log.Errorw(msg, serializeMap(keysAndValues)...)
+}
+
+// Fatal logs a message at level Fatal on the standard logger.
+func (l logger) Fatal(args ...interface{}) {
+ l.log.Fatal(args...)
+}
+
+// Fatalln logs a message at level Fatal on the standard logger with a line feed. Default in any case.
+func (l logger) Fatalln(args ...interface{}) {
+ l.log.Fatal(args...)
+}
+
+// Fatalf logs a message at level Fatal on the standard logger.
+func (l logger) Fatalf(format string, args ...interface{}) {
+ l.log.Fatalf(format, args...)
+}
+
+// Fatalw logs a message with some additional context. The variadic key-value
+// pairs are treated as they are in With.
+func (l logger) Fatalw(msg string, keysAndValues Fields) {
+ l.log.Fatalw(msg, serializeMap(keysAndValues)...)
+}
+
+// With returns a logger initialized with the key-value pairs
+func With(keysAndValues Fields) Logger {
+ return logger{log: defaultLogger.sourced().With(serializeMap(keysAndValues)...), parent: defaultLogger.parent}
+}
+
+// Debug logs a message at level Debug on the standard logger.
+func Debug(args ...interface{}) {
+ defaultLogger.sourced().Debug(args...)
+}
+
+// Debugln logs a message at level Debug on the standard logger.
+func Debugln(args ...interface{}) {
+ defaultLogger.sourced().Debug(args...)
+}
+
+// Debugf logs a message at level Debug on the standard logger.
+func Debugf(format string, args ...interface{}) {
+ defaultLogger.sourced().Debugf(format, args...)
+}
+
+// Debugw logs a message with some additional context. The variadic key-value
+// pairs are treated as they are in With.
+func Debugw(msg string, keysAndValues Fields) {
+ defaultLogger.sourced().Debugw(msg, serializeMap(keysAndValues)...)
+}
+
+// Info logs a message at level Info on the standard logger.
+func Info(args ...interface{}) {
+ defaultLogger.sourced().Info(args...)
+}
+
+// Infoln logs a message at level Info on the standard logger.
+func Infoln(args ...interface{}) {
+ defaultLogger.sourced().Info(args...)
+}
+
+// Infof logs a message at level Info on the standard logger.
+func Infof(format string, args ...interface{}) {
+ defaultLogger.sourced().Infof(format, args...)
+}
+
+//Infow logs a message with some additional context. The variadic key-value
+//pairs are treated as they are in With.
+func Infow(msg string, keysAndValues Fields) {
+ defaultLogger.sourced().Infow(msg, serializeMap(keysAndValues)...)
+}
+
+// Warn logs a message at level Warn on the standard logger.
+func Warn(args ...interface{}) {
+ defaultLogger.sourced().Warn(args...)
+}
+
+// Warnln logs a message at level Warn on the standard logger.
+func Warnln(args ...interface{}) {
+ defaultLogger.sourced().Warn(args...)
+}
+
+// Warnf logs a message at level Warn on the standard logger.
+func Warnf(format string, args ...interface{}) {
+ defaultLogger.sourced().Warnf(format, args...)
+}
+
+// Warnw logs a message with some additional context. The variadic key-value
+// pairs are treated as they are in With.
+func Warnw(msg string, keysAndValues Fields) {
+ defaultLogger.sourced().Warnw(msg, serializeMap(keysAndValues)...)
+}
+
+// Error logs a message at level Error on the standard logger.
+func Error(args ...interface{}) {
+ defaultLogger.sourced().Error(args...)
+}
+
+// Errorln logs a message at level Error on the standard logger.
+func Errorln(args ...interface{}) {
+ defaultLogger.sourced().Error(args...)
+}
+
+// Errorf logs a message at level Error on the standard logger.
+func Errorf(format string, args ...interface{}) {
+ defaultLogger.sourced().Errorf(format, args...)
+}
+
+// Errorw logs a message with some additional context. The variadic key-value
+// pairs are treated as they are in With.
+func Errorw(msg string, keysAndValues Fields) {
+ defaultLogger.sourced().Errorw(msg, serializeMap(keysAndValues)...)
+}
+
+// Fatal logs a message at level Fatal on the standard logger.
+func Fatal(args ...interface{}) {
+ defaultLogger.sourced().Fatal(args...)
+}
+
+// Fatalln logs a message at level Fatal on the standard logger.
+func Fatalln(args ...interface{}) {
+ defaultLogger.sourced().Fatal(args...)
+}
+
+// Fatalf logs a message at level Fatal on the standard logger.
+func Fatalf(format string, args ...interface{}) {
+ defaultLogger.sourced().Fatalf(format, args...)
+}
+
+// Fatalw logs a message with some additional context. The variadic key-value
+// pairs are treated as they are in With.
+func Fatalw(msg string, keysAndValues Fields) {
+ defaultLogger.sourced().Fatalw(msg, serializeMap(keysAndValues)...)
+}
diff --git a/compose/rw_core.yml b/compose/rw_core.yml
new file mode 100644
index 0000000..0fde367
--- /dev/null
+++ b/compose/rw_core.yml
@@ -0,0 +1,17 @@
+version: '2'
+services:
+ rw_core:
+ image: voltha-rw-core
+ entrypoint:
+ - /app/rw_core
+ - -kv-store-type=etcd
+ - -kv-store-host=${DOCKER_HOST_IP}
+ - -kv-store-port=2379
+ volumes:
+ - "/var/run/docker.sock:/tmp/docker.sock"
+ networks:
+ - default
+
+networks:
+ default:
+ driver: bridge
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
new file mode 100644
index 0000000..8463747
--- /dev/null
+++ b/db/kvstore/client.go
@@ -0,0 +1,66 @@
+package kvstore
+
+const (
+ // Default timeout in seconds when making a kvstore request
+ defaultKVGetTimeout = 5
+ // Maximum channel buffer between publisher/subscriber goroutines
+ maxClientChannelBufferSize = 10
+)
+
+// These constants represent the event types returned by the KV client
+const (
+ PUT = iota
+ DELETE
+ CONNECTIONDOWN
+ UNKNOWN
+)
+
+// KVPair is a common wrapper for key-value pairs returned from the KV store
+type KVPair struct {
+ Key string
+ Value interface{}
+ Session string
+ Lease int64
+}
+
+// NewKVPair creates a new KVPair object
+func NewKVPair(key string, value interface{}, session string, lease int64) *KVPair {
+ kv := new(KVPair)
+ kv.Key = key
+ kv.Value = value
+ kv.Session = session
+ kv.Lease = lease
+ return kv
+}
+
+// Event is generated by the KV client when a key change is detected
+type Event struct {
+ EventType int
+ Key interface{}
+ Value interface{}
+}
+
+// NewEvent creates a new Event object
+func NewEvent(eventType int, key interface{}, value interface{}) *Event {
+ evnt := new(Event)
+ evnt.EventType = eventType
+ evnt.Key = key
+ evnt.Value = value
+
+ return evnt
+}
+
+// Client represents the set of APIs a KV Client must implement
+type Client interface {
+ List(key string, timeout int) (map[string]*KVPair, error)
+ Get(key string, timeout int) (*KVPair, error)
+ Put(key string, value interface{}, timeout int) error
+ Delete(key string, timeout int) error
+ Reserve(key string, value interface{}, ttl int64) (interface{}, error)
+ ReleaseReservation(key string) error
+ ReleaseAllReservations() error
+ RenewReservation(key string) error
+ Watch(key string) chan *Event
+ CloseWatch(key string, ch chan *Event)
+ Close()
+}
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
new file mode 100644
index 0000000..453f282
--- /dev/null
+++ b/db/kvstore/consulclient.go
@@ -0,0 +1,486 @@
+package kvstore
+
+import (
+ "context"
+ "errors"
+ "bytes"
+ "sync"
+ "time"
+ log "github.com/opencord/voltha-go/common/log"
+ //log "ciena.com/coordinator/common"
+ consulapi "github.com/hashicorp/consul/api"
+)
+
+type channelContextMap struct {
+ ctx context.Context
+ channel chan *Event
+ cancel context.CancelFunc
+}
+
+
+// ConsulClient represents the consul KV store client
+type ConsulClient struct {
+ session *consulapi.Session
+ sessionID string
+ consul *consulapi.Client
+ doneCh *chan int
+ keyReservations map[string]interface{}
+ watchedChannelsContext map[string][]*channelContextMap
+ writeLock sync.Mutex
+}
+
+// NewConsulClient returns a new client for the Consul KV store
+func NewConsulClient(addr string, timeout int) (*ConsulClient, error) {
+
+ duration := GetDuration(timeout)
+
+ config := consulapi.DefaultConfig()
+ config.Address = addr
+ config.WaitTime = duration
+ consul, err := consulapi.NewClient(config)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+
+ doneCh := make(chan int, 1)
+ wChannelsContext := make(map[string][]*channelContextMap)
+ reservations := make(map[string]interface{})
+ return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
+}
+
+// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
+// wait for a response
+func (c *ConsulClient) List(key string, timeout int) (map[string]*KVPair, error) {
+ duration := GetDuration(timeout)
+
+ kv := c.consul.KV()
+ var queryOptions consulapi.QueryOptions
+ queryOptions.WaitTime = duration
+ // For now we ignore meta data
+ kvps, _, err := kv.List(key, &queryOptions)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ m := make(map[string]*KVPair)
+ for _, kvp := range kvps {
+ m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0)
+ }
+ return m, nil
+}
+
+// Get returns a key-value pair for a given key. Timeout defines how long the function will
+// wait for a response
+func (c *ConsulClient) Get(key string, timeout int) (*KVPair, error) {
+
+ duration := GetDuration(timeout)
+
+ kv := c.consul.KV()
+ var queryOptions consulapi.QueryOptions
+ queryOptions.WaitTime = duration
+ // For now we ignore meta data
+ kvp, _, err := kv.Get(key, &queryOptions)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ if kvp != nil {
+ return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0), nil
+ }
+
+ return nil, nil
+}
+
+// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
+// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
+// wait for a response
+func (c *ConsulClient) Put(key string, value interface{}, timeout int) error {
+
+ // Validate that we can create a byte array from the value as consul API expects a byte array
+ var val []byte
+ var er error
+ if val, er = ToByte(value); er != nil {
+ log.Error(er)
+ return er
+ }
+
+ // Create a key value pair
+ kvp := consulapi.KVPair{Key: key, Value: val}
+ kv := c.consul.KV()
+ var writeOptions consulapi.WriteOptions
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ _, err := kv.Put(&kvp, &writeOptions)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+ return nil
+}
+
+// Delete removes a key from the KV store. Timeout defines how long the function will
+// wait for a response
+func (c *ConsulClient) Delete(key string, timeout int) error {
+ kv := c.consul.KV()
+ var writeOptions consulapi.WriteOptions
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ _, err := kv.Delete(key, &writeOptions)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+ return nil
+}
+
+func (c *ConsulClient) deleteSession() {
+ if c.sessionID != "" {
+ log.Debug("cleaning-up-session")
+ session := c.consul.Session()
+ _, err := session.Destroy(c.sessionID, nil)
+ if err != nil {
+ log.Errorw("error-cleaning-session", log.Fields{"session":c.sessionID, "error":err})
+ }
+ }
+ c.sessionID = ""
+ c.session = nil
+}
+
+func (c *ConsulClient) createSession(ttl int64, retries int) (*consulapi.Session, string, error) {
+ session := c.consul.Session()
+ entry := &consulapi.SessionEntry{
+ Behavior: consulapi.SessionBehaviorDelete,
+ TTL: "10s", // strconv.FormatInt(ttl, 10) + "s", // disable ttl
+ }
+
+ for {
+ id, meta, err := session.Create(entry, nil)
+ if err != nil {
+ log.Errorw("create-session-error", log.Fields{"error":err})
+ if retries == 0 {
+ return nil, "", err
+ }
+ } else if meta.RequestTime == 0 {
+ log.Errorw("create-session-bad-meta-data", log.Fields{"meta-data":meta})
+ if retries == 0 {
+ return nil, "", errors.New("bad-meta-data")
+ }
+ } else if id == "" {
+ log.Error("create-session-nil-id")
+ if retries == 0 {
+ return nil, "", errors.New("ID-nil")
+ }
+ } else {
+ return session, id, nil
+ }
+ // If retry param is -1 we will retry indefinitely
+ if retries > 0 {
+ retries--
+ }
+ log.Debug("retrying-session-create-after-a-second-delay")
+ time.Sleep(time.Duration(1) * time.Second)
+ }
+}
+
+
+// Helper function to verify mostly whether the content of two interface types are the same. Focus is []byte and
+// string types
+func isEqual(val1 interface{}, val2 interface{}) bool {
+ b1, err := ToByte(val1)
+ b2, er := ToByte(val2)
+ if err == nil && er == nil {
+ return bytes.Equal(b1, b2)
+ }
+ return val1 == val2
+}
+
+// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
+// the consul API accepts only a []byte. Timeout defines how long the function will wait for a response. TTL
+// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
+// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
+// then the value assigned to that key will be returned.
+func (c *ConsulClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
+
+ // Validate that we can create a byte array from the value as consul API expects a byte array
+ var val []byte
+ var er error
+ if val, er = ToByte(value); er != nil {
+ log.Error(er)
+ return nil, er
+ }
+
+ // Cleanup any existing session and recreate new ones. A key is reserved against a session
+ if c.sessionID != "" {
+ c.deleteSession()
+ }
+
+ // Clear session if reservation is not successful
+ reservationSuccessful := false
+ defer func() {
+ if !reservationSuccessful {
+ log.Debug("deleting-session")
+ c.deleteSession()
+ }
+ }()
+
+ session, sessionID, err := c.createSession(ttl, -1)
+ if err != nil {
+ log.Errorw("no-session-created", log.Fields{"error":err})
+ return "", errors.New("no-session-created")
+ }
+ log.Debugw("session-created", log.Fields{"session-id":sessionID})
+ c.sessionID = sessionID
+ c.session = session
+
+ // Try to grap the Key using the session
+ kv := c.consul.KV()
+ kvp := consulapi.KVPair{Key: key, Value: val, Session: c.sessionID}
+ result, _, err := kv.Acquire(&kvp, nil)
+ if err != nil {
+ log.Errorw("error-acquiring-keys", log.Fields{"error":err})
+ return nil, err
+ }
+
+ log.Debugw("key-acquired", log.Fields{"key":key, "status":result})
+
+ // Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
+ m, err := c.Get(key, defaultKVGetTimeout)
+ if err != nil {
+ return nil, err
+ }
+ if m != nil {
+ log.Debugw("response-received", log.Fields{"key":m.Key, "m.value":string(m.Value.([]byte)), "value":value})
+ if m.Key == key && isEqual(m.Value, value) {
+ // My reservation is successful - register it. For now, support is only for 1 reservation per key
+ // per session.
+ reservationSuccessful = true
+ c.writeLock.Lock()
+ c.keyReservations[key] = m.Value
+ c.writeLock.Unlock()
+ return m.Value, nil
+ }
+ // My reservation has failed. Return the owner of that key
+ return m.Value, nil
+ }
+ return nil, nil
+}
+
+// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
+func (c *ConsulClient) ReleaseAllReservations() error {
+ kv := c.consul.KV()
+ var kvp consulapi.KVPair
+ var result bool
+ var err error
+
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+
+ for key, value := range c.keyReservations {
+ kvp = consulapi.KVPair{Key: key, Value: value.([]byte), Session: c.sessionID}
+ result, _, err = kv.Release(&kvp, nil)
+ if err != nil {
+ log.Errorw("cannot-release-reservation", log.Fields{"key":key, "error":err})
+ return err
+ }
+ if !result {
+ log.Errorw("cannot-release-reservation", log.Fields{"key":key})
+ }
+ delete(c.keyReservations, key)
+ }
+ return nil
+}
+
+// ReleaseReservation releases reservation for a specific key.
+func (c *ConsulClient) ReleaseReservation(key string) error {
+ var ok bool
+ var reservedValue interface{}
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ if reservedValue, ok = c.keyReservations[key]; !ok {
+ return errors.New("key-not-reserved:" + key)
+ }
+ // Release the reservation
+ kv := c.consul.KV()
+ kvp := consulapi.KVPair{Key: key, Value: reservedValue.([]byte), Session: c.sessionID}
+
+ result, _, er := kv.Release(&kvp, nil)
+ if er != nil {
+ return er
+ }
+ // Remove that key entry on success
+ if result {
+ delete(c.keyReservations, key)
+ return nil
+ }
+ return errors.New("key-cannot-be-unreserved")
+}
+
+// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
+// period specified when reserving the key
+func (c *ConsulClient) RenewReservation(key string) error {
+ // In the case of Consul, renew reservation of a reserve key only require renewing the client session.
+
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+
+ // Verify the key was reserved
+ if _, ok := c.keyReservations[key]; !ok {
+ return errors.New("key-not-reserved")
+ }
+
+ if c.session == nil {
+ return errors.New("no-session-exist")
+ }
+
+ var writeOptions consulapi.WriteOptions
+ if _, _, err := c.session.Renew(c.sessionID, &writeOptions); err != nil {
+ return err
+ }
+ return nil
+}
+
+// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
+// listen to receive Events.
+func (c *ConsulClient) Watch(key string) chan *Event {
+
+ // Create a new channel
+ ch := make(chan *Event, maxClientChannelBufferSize)
+
+ // Create a context to track this request
+ watchContext, cFunc := context.WithCancel(context.Background())
+
+ // Save the channel and context reference for later
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ ccm := channelContextMap{channel: ch, ctx: watchContext, cancel: cFunc}
+ c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key], &ccm)
+
+ // Launch a go routine to listen for updates
+ go c.listenForKeyChange(watchContext, key, ch)
+
+ return ch
+}
+
+// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
+// may be multiple listeners on the same key. The previously created channel serves as a key
+func (c *ConsulClient) CloseWatch(key string, ch chan *Event) {
+ // First close the context
+ var ok bool
+ var watchedChannelsContexts []*channelContextMap
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ if watchedChannelsContexts, ok = c.watchedChannelsContext[key]; !ok {
+ log.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key":key})
+ return
+ }
+ // Look for the channels
+ var pos = -1
+ for i, chCtxMap := range watchedChannelsContexts {
+ if chCtxMap.channel == ch {
+ log.Debug("channel-found")
+ chCtxMap.cancel()
+ //close the channel
+ close(ch)
+ pos = i
+ break
+ }
+ }
+ // Remove that entry if present
+ if pos >= 0 {
+ c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key][:pos], c.watchedChannelsContext[key][pos+1:]...)
+ }
+ log.Debugw("watched-channel-exiting", log.Fields{"key":key, "channel":c.watchedChannelsContext[key]})
+}
+
+func (c *ConsulClient) isKVEqual(kv1 *consulapi.KVPair, kv2 *consulapi.KVPair) bool {
+ if (kv1 == nil) && (kv2 == nil) {
+ return true
+ } else if (kv1 == nil) || (kv2 == nil) {
+ return false
+ }
+ // Both the KV should be non-null here
+ if kv1.Key != kv2.Key ||
+ !bytes.Equal(kv1.Value, kv2.Value) ||
+ kv1.Session != kv2.Session ||
+ kv1.LockIndex != kv2.LockIndex ||
+ kv1.ModifyIndex != kv2.ModifyIndex {
+ return false
+ }
+ return true
+}
+
+func (c *ConsulClient) listenForKeyChange(watchContext context.Context, key string, ch chan *Event) {
+ log.Debugw("start-watching-channel", log.Fields{"key":key, "channel":ch})
+
+ defer c.CloseWatch(key, ch)
+ duration := GetDuration(defaultKVGetTimeout)
+ kv := c.consul.KV()
+ var queryOptions consulapi.QueryOptions
+ queryOptions.WaitTime = duration
+
+ // Get the existing value, if any
+ previousKVPair, meta, err := kv.Get(key, &queryOptions)
+ if err != nil {
+ log.Debug(err)
+ }
+ lastIndex := meta.LastIndex
+
+ // Wait for change. Push any change onto the channel and keep waiting for new update
+ //var waitOptions consulapi.QueryOptions
+ var pair *consulapi.KVPair
+ //watchContext, _ := context.WithCancel(context.Background())
+ waitOptions := queryOptions.WithContext(watchContext)
+ for {
+ //waitOptions = consulapi.QueryOptions{WaitIndex: lastIndex}
+ waitOptions.WaitIndex = lastIndex
+ pair, meta, err = kv.Get(key, waitOptions)
+ select {
+ case <-watchContext.Done():
+ log.Debug("done-event-received-exiting")
+ return
+ default:
+ if err != nil {
+ log.Warnw("error-from-watch", log.Fields{"error":err})
+ ch <- NewEvent(CONNECTIONDOWN, key, []byte(""))
+ } else {
+ log.Debugw("index-state", log.Fields{"lastindex":lastIndex, "newindex":meta.LastIndex, "key":key})
+ }
+ }
+ if err != nil {
+ log.Debug(err)
+ // On error, block for 10 milliseconds to prevent endless loop
+ time.Sleep(10 * time.Millisecond)
+ } else if meta.LastIndex <= lastIndex {
+ log.Info("no-index-change-or-negative")
+ } else {
+ log.Debugw("update-received", log.Fields{"pair":pair})
+ if pair == nil {
+ ch <- NewEvent(DELETE, key, []byte(""))
+ } else if !c.isKVEqual(pair, previousKVPair) {
+ // Push the change onto the channel if the data has changed
+ // For now just assume it's a PUT change
+ log.Debugw("pair-details", log.Fields{"session":pair.Session, "key":pair.Key, "value":pair.Value})
+ ch <- NewEvent(PUT, pair.Key, pair.Value)
+ }
+ previousKVPair = pair
+ lastIndex = meta.LastIndex
+ }
+ }
+}
+
+// Close closes the KV store client
+func (c *ConsulClient) Close() {
+ var writeOptions consulapi.WriteOptions
+ // Inform any goroutine it's time to say goodbye.
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ if c.doneCh != nil {
+ close(*c.doneCh)
+ }
+
+ // Clear the sessionID
+ if _, err := c.consul.Session().Destroy(c.sessionID, &writeOptions); err != nil {
+ log.Errorw("error-closing-client", log.Fields{"error":err})
+ }
+}
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
new file mode 100644
index 0000000..2755cd1
--- /dev/null
+++ b/db/kvstore/etcdclient.go
@@ -0,0 +1,378 @@
+package kvstore
+
+import (
+ //log "../common"
+ "context"
+ "errors"
+ "sync"
+ log "github.com/opencord/voltha-go/common/log"
+ "fmt"
+ v3Client "github.com/coreos/etcd/clientv3"
+ v3rpcTypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+)
+
+// EtcdClient represents the Etcd KV store client
+type EtcdClient struct {
+ ectdAPI *v3Client.Client
+ leaderRev v3Client.Client
+ keyReservations map[string]*v3Client.LeaseID
+ watchedChannels map[string][]map[chan *Event]v3Client.Watcher
+ writeLock sync.Mutex
+}
+
+// NewEtcdClient returns a new client for the Etcd KV store
+func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
+
+ duration := GetDuration(timeout)
+
+ c, err := v3Client.New(v3Client.Config{
+ Endpoints: []string{addr},
+ DialTimeout: duration,
+ })
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ wc := make(map[string][]map[chan *Event]v3Client.Watcher)
+ reservations := make(map[string]*v3Client.LeaseID)
+ return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations: reservations}, nil
+}
+
+// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
+// wait for a response
+func (c *EtcdClient) List(key string, timeout int) (map[string]*KVPair, error) {
+ duration := GetDuration(timeout)
+
+ ctx, cancel := context.WithTimeout(context.Background(), duration)
+ resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
+ cancel()
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ m := make(map[string]*KVPair)
+ for _, ev := range resp.Kvs {
+ m[string(ev.Key)] = NewKVPair(string(ev.Key), ev.Value, "", ev.Lease)
+ }
+ return m, nil
+}
+
+// Get returns a key-value pair for a given key. Timeout defines how long the function will
+// wait for a response
+func (c *EtcdClient) Get(key string, timeout int) (*KVPair, error) {
+ duration := GetDuration(timeout)
+
+ ctx, cancel := context.WithTimeout(context.Background(), duration)
+ resp, err := c.ectdAPI.Get(ctx, key)
+ cancel()
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ for _, ev := range resp.Kvs {
+ // Only one value is returned
+ return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease), nil
+ }
+ return nil, nil
+}
+
+// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
+// accepts only a string as a value for a put operation. Timeout defines how long the function will
+// wait for a response
+func (c *EtcdClient) Put(key string, value interface{}, timeout int) error {
+
+ // Validate that we can convert value to a string as etcd API expects a string
+ var val string
+ var er error
+ if val, er = ToString(value); er != nil {
+ return fmt.Errorf("unexpected-type-%T", value)
+ }
+
+ duration := GetDuration(timeout)
+
+ ctx, cancel := context.WithTimeout(context.Background(), duration)
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ _, err := c.ectdAPI.Put(ctx, key, val)
+ cancel()
+ if err != nil {
+ switch err {
+ case context.Canceled:
+ log.Warnw("context-cancelled", log.Fields{"error":err})
+ case context.DeadlineExceeded:
+ log.Warnw("context-deadline-exceeded", log.Fields{"error":err})
+ case v3rpcTypes.ErrEmptyKey:
+ log.Warnw("etcd-client-error", log.Fields{"error":err})
+ default:
+ log.Warnw("bad-endpoints", log.Fields{"error":err})
+ }
+ return err
+ }
+ return nil
+}
+
+// Delete removes a key from the KV store. Timeout defines how long the function will
+// wait for a response
+func (c *EtcdClient) Delete(key string, timeout int) error {
+
+ duration := GetDuration(timeout)
+
+ ctx, cancel := context.WithTimeout(context.Background(), duration)
+ defer cancel()
+
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+
+ // count keys about to be deleted
+ gresp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+
+ // delete the keys
+ dresp, err := c.ectdAPI.Delete(ctx, key, v3Client.WithPrefix())
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+
+ if dresp == nil || gresp == nil {
+ log.Debug("nothing-to-delete")
+ return nil
+ }
+
+ log.Debugw("delete-keys", log.Fields{"all-keys-deleted":int64(len(gresp.Kvs)) == dresp.Deleted})
+ if int64(len(gresp.Kvs)) == dresp.Deleted {
+ log.Debug("All-keys-deleted")
+ } else {
+ log.Error("not-all-keys-deleted")
+ err := errors.New("not-all-keys-deleted")
+ return err
+ }
+ return nil
+}
+
+// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
+// the etcd API accepts only a string. Timeout defines how long the function will wait for a response. TTL
+// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
+// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
+// then the value assigned to that key will be returned.
+func (c *EtcdClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
+ // Validate that we can convert value to a string as etcd API expects a string
+ var val string
+ var er error
+ if val, er = ToString(value); er != nil {
+ return nil, fmt.Errorf("unexpected-type%T", value)
+ }
+
+ // Create a lease
+ resp, err := c.ectdAPI.Grant(context.Background(), ttl)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ // Register the lease id
+ c.writeLock.Lock()
+ c.keyReservations[key] = &resp.ID
+ c.writeLock.Unlock()
+
+ // Revoke lease if reservation is not successful
+ reservationSuccessful := false
+ defer func() {
+ if !reservationSuccessful {
+ if err = c.ReleaseReservation(key); err != nil {
+ log.Errorf("cannot-release-lease")
+ }
+ }
+ }()
+
+ // Try to grap the Key with the above lease
+ c.ectdAPI.Txn(context.Background())
+ txn := c.ectdAPI.Txn(context.Background())
+ txn = txn.If(v3Client.Compare(v3Client.Version(key), "=", 0))
+ txn = txn.Then(v3Client.OpPut(key, val, v3Client.WithLease(resp.ID)))
+ txn = txn.Else(v3Client.OpGet(key))
+ result, er := txn.Commit()
+ if er != nil {
+ return nil, er
+ }
+
+ if !result.Succeeded {
+ // Verify whether we are already the owner of that Key
+ if len(result.Responses) > 0 &&
+ len(result.Responses[0].GetResponseRange().Kvs) > 0 {
+ kv := result.Responses[0].GetResponseRange().Kvs[0]
+ if string(kv.Value) == val {
+ reservationSuccessful = true
+ return value, nil
+ }
+ return kv.Value, nil
+ }
+ } else {
+ // Read the Key to ensure this is our Key
+ m, err := c.Get(key, defaultKVGetTimeout)
+ if err != nil {
+ return nil, err
+ }
+ if m != nil {
+ if m.Key == key && isEqual(m.Value, value) {
+ // My reservation is successful - register it. For now, support is only for 1 reservation per key
+ // per session.
+ reservationSuccessful = true
+ return value, nil
+ }
+ // My reservation has failed. Return the owner of that key
+ return m.Value, nil
+ }
+ }
+ return nil, nil
+}
+
+// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
+func (c *EtcdClient) ReleaseAllReservations() error {
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ for key, leaseID := range c.keyReservations {
+ _, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
+ if err != nil {
+ log.Errorw("cannot-release-reservation", log.Fields{"key":key, "error":err})
+ return err
+ }
+ delete(c.keyReservations, key)
+ }
+ return nil
+}
+
+// ReleaseReservation releases reservation for a specific key.
+func (c *EtcdClient) ReleaseReservation(key string) error {
+ // Get the leaseid using the key
+ var ok bool
+ var leaseID *v3Client.LeaseID
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ if leaseID, ok = c.keyReservations[key]; !ok {
+ return errors.New("key-not-reserved")
+ }
+ if leaseID != nil {
+ _, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+ delete(c.keyReservations, key)
+ }
+ return nil
+}
+
+// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
+// period specified when reserving the key
+func (c *EtcdClient) RenewReservation(key string) error {
+ // Get the leaseid using the key
+ var ok bool
+ var leaseID *v3Client.LeaseID
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ if leaseID, ok = c.keyReservations[key]; !ok {
+ return errors.New("key-not-reserved")
+ }
+
+ if leaseID != nil {
+ _, err := c.ectdAPI.KeepAliveOnce(context.Background(), *leaseID)
+ if err != nil {
+ log.Errorw("lease-may-have-expired", log.Fields{"error":err})
+ return err
+ }
+ } else {
+ return errors.New("lease-expired")
+ }
+ return nil
+}
+
+// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
+// listen to receive Events.
+func (c *EtcdClient) Watch(key string) chan *Event {
+ w := v3Client.NewWatcher(c.ectdAPI)
+ channel := w.Watch(context.Background(), key, v3Client.WithPrefix())
+
+ // Create a new channel
+ ch := make(chan *Event, maxClientChannelBufferSize)
+
+ // Keep track of the created channels so they can be closed when required
+ channelMap := make(map[chan *Event]v3Client.Watcher)
+ channelMap[ch] = w
+ //c.writeLock.Lock()
+ //defer c.writeLock.Unlock()
+ c.watchedChannels[key] = append(c.watchedChannels[key], channelMap)
+
+ log.Debugw("watched-channels", log.Fields{"channels":c.watchedChannels[key]})
+ // Launch a go routine to listen for updates
+ go c.listenForKeyChange(channel, ch)
+
+ return ch
+
+}
+
+// CloseWatch closes a specific watch. Both the key and the channel are required when closing a watch as there
+// may be multiple listeners on the same key. The previously created channel serves as a key
+func (c *EtcdClient) CloseWatch(key string, ch chan *Event) {
+ // Get the array of channels mapping
+ var watchedChannels []map[chan *Event]v3Client.Watcher
+ var ok bool
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+
+ if watchedChannels, ok = c.watchedChannels[key]; !ok {
+ log.Warnw("key-has-no-watched-channels", log.Fields{"key":key})
+ return
+ }
+ // Look for the channels
+ var pos = -1
+ for i, chMap := range watchedChannels {
+ if t, ok := chMap[ch]; ok {
+ log.Debug("channel-found")
+ // Close the etcd watcher before the client channel. This should close the etcd channel as well
+ if err := t.Close(); err != nil {
+ log.Errorw("watcher-cannot-be-closed", log.Fields{"key":key, "error":err})
+ }
+ close(ch)
+ pos = i
+ break
+ }
+ }
+ // Remove that entry if present
+ if pos >= 0 {
+ c.watchedChannels[key] = append(c.watchedChannels[key][:pos], c.watchedChannels[key][pos+1:]...)
+ }
+ log.Infow("watcher-channel-exiting", log.Fields{"key":key, "channel":c.watchedChannels[key]})
+}
+
+func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
+ log.Infow("start-listening-on-channel", log.Fields{"channel":ch})
+ for resp := range channel {
+ for _, ev := range resp.Events {
+ //log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
+ ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value)
+ }
+ }
+ log.Info("stop-listening-on-channel")
+}
+
+func getEventType(event *v3Client.Event) int {
+ switch event.Type {
+ case v3Client.EventTypePut:
+ return PUT
+ case v3Client.EventTypeDelete:
+ return DELETE
+ }
+ return UNKNOWN
+}
+
+// Close closes the KV store client
+func (c *EtcdClient) Close() {
+ c.writeLock.Lock()
+ defer c.writeLock.Unlock()
+ if err := c.ectdAPI.Close(); err != nil {
+ log.Errorw("error-closing-client", log.Fields{"error":err})
+ }
+}
diff --git a/db/kvstore/kvutils.go b/db/kvstore/kvutils.go
new file mode 100644
index 0000000..318482f
--- /dev/null
+++ b/db/kvstore/kvutils.go
@@ -0,0 +1,41 @@
+package kvstore
+
+import (
+ "fmt"
+ "time"
+)
+
+// GetDuration converts a timeout value from int to duration. If the timeout value is
+// either not set of -ve then we default KV timeout (configurable) is used.
+func GetDuration(timeout int) time.Duration {
+ if timeout <= 0 {
+ return defaultKVGetTimeout * time.Second
+ }
+ return time.Duration(timeout) * time.Second
+}
+
+// ToString converts an interface value to a string. The interface should either be of
+// a string type or []byte. Otherwise, an error is returned.
+func ToString(value interface{}) (string, error) {
+ switch t := value.(type) {
+ case []byte:
+ return string(value.([]byte)), nil
+ case string:
+ return value.(string), nil
+ default:
+ return "", fmt.Errorf("unexpected-type-%T", t)
+ }
+}
+
+// ToByte converts an interface value to a []byte. The interface should either be of
+// a string type or []byte. Otherwise, an error is returned.
+func ToByte(value interface{}) ([]byte, error) {
+ switch t := value.(type) {
+ case []byte:
+ return value.([]byte), nil
+ case string:
+ return []byte(value.(string)), nil
+ default:
+ return nil, fmt.Errorf("unexpected-type-%T", t)
+ }
+}
diff --git a/db/kvstore/kvutils_test.go b/db/kvstore/kvutils_test.go
new file mode 100644
index 0000000..f5e82d2
--- /dev/null
+++ b/db/kvstore/kvutils_test.go
@@ -0,0 +1,67 @@
+package kvstore
+
+import (
+ "time"
+ "testing"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestDurationWithNegativeTimeout(t *testing.T) {
+ actualResult := GetDuration(-1)
+ var expectedResult = defaultKVGetTimeout * time.Second
+
+ assert.Equal(t, expectedResult, actualResult)
+}
+
+func TestDurationWithZeroTimeout(t *testing.T) {
+ actualResult := GetDuration(0)
+ var expectedResult = defaultKVGetTimeout * time.Second
+
+ assert.Equal(t, expectedResult, actualResult)
+}
+
+func TestDurationWithTimeout(t *testing.T) {
+ actualResult := GetDuration(10)
+ var expectedResult = time.Duration(10) * time.Second
+
+ assert.Equal(t, expectedResult, actualResult)
+}
+
+func TestToStringWithString(t *testing.T) {
+ actualResult, _ := ToString("myString")
+ var expectedResult = "myString"
+
+ assert.Equal(t, expectedResult, actualResult)
+}
+
+func TestToStringWithEmpty(t *testing.T) {
+ actualResult, _ := ToString("")
+ var expectedResult = ""
+
+ assert.Equal(t, expectedResult, actualResult)
+}
+
+func TestToStringWithByte(t *testing.T) {
+ mByte := []byte("Hello")
+ actualResult, _ := ToString(mByte)
+ var expectedResult = "Hello"
+
+ assert.Equal(t, expectedResult, actualResult)
+}
+
+func TestToStringWithEmptyByte(t *testing.T) {
+ mByte := []byte("")
+ actualResult, _ := ToString(mByte)
+ var expectedResult = ""
+
+ assert.Equal(t, expectedResult, actualResult)
+}
+
+func TestToStringForErrorCase(t *testing.T) {
+ mInt := 200
+ actualResult, error := ToString(mInt)
+ var expectedResult = ""
+
+ assert.Equal(t, expectedResult, actualResult)
+ assert.NotEqual(t, error, nil)
+}
diff --git a/docker/Dockerfile.rw_core b/docker/Dockerfile.rw_core
new file mode 100644
index 0000000..ecc9919
--- /dev/null
+++ b/docker/Dockerfile.rw_core
@@ -0,0 +1,31 @@
+# -------------
+# Build stage
+
+FROM golang:alpine AS build-env
+
+# Install required packages
+RUN apk add --no-cache wget git make build-base
+
+# Prepare directory structure
+RUN ["mkdir", "-p", "/src"]
+RUN ["mkdir", "-p", "$GOPATH/src", "$GOPATH/pkg", "$GOPATH/bin"]
+
+# Copy files
+ADD rw_core $GOPATH/src/github.com/opencord/voltha-go/rw_core
+ADD common $GOPATH/src/github.com/opencord/voltha-go/common
+ADD db $GOPATH/src/github.com/opencord/voltha-go/db
+
+# Build rw_core
+RUN cd $GOPATH/src/github.com/opencord/voltha-go/rw_core && go get -d ./... && go build -o /src/rw_core
+
+# -------------
+# Image creation stage
+
+FROM alpine:3.6
+
+# Set the working directory
+WORKDIR /app
+
+# Copy required files
+COPY --from=build-env /src/rw_core /app/
+
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
new file mode 100644
index 0000000..ec30eac
--- /dev/null
+++ b/rw_core/config/config.go
@@ -0,0 +1,88 @@
+package config
+
+import (
+ //"context"
+ "flag"
+ "fmt"
+ //dt "github.com/docker/docker/api/types"
+ //dc "github.com/docker/docker/client"
+ "os"
+ "time"
+)
+
+// Constants used to differentiate between the KV stores
+const (
+ ConsulStoreName string = "consul"
+ EtcdStoreName string = "etcd"
+)
+
+// CoordinatorFlags represents the set of configurations used by the coordinator
+type RWCoreFlags struct {
+ // Command line parameters
+ InstanceID string
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ LogLevel string
+}
+
+// NewRWCoreFlags returns a new coordinator config
+func NewRWCoreFlags() *RWCoreFlags {
+ var rwCoreFlag = RWCoreFlags{ // Default values
+ InstanceID: "rw_coreInstance001",
+ KVStoreType: ConsulStoreName,
+ KVStoreTimeout: 5,
+ KVStoreHost: "10.100.198.240",
+ //KVStorePort: 2379,
+ KVStorePort: 8500,
+ LogLevel: "info",
+ }
+ return &rwCoreFlag
+}
+
+// ParseCommandArguments parses the arguments when running coordinator
+func (cf *RWCoreFlags) ParseCommandArguments() {
+ flag.IntVar(&(cf.KVStoreTimeout),
+ "kv-store-request-timeout",
+ cf.KVStoreTimeout,
+ "The default timeout when making a kv store request")
+
+ flag.StringVar(&(cf.KVStoreType),
+ "kv-store-type",
+ cf.KVStoreType,
+ "KV store type")
+
+ flag.StringVar(&(cf.KVStoreHost),
+ "kv-store-host",
+ cf.KVStoreHost,
+ "KV store host")
+
+ flag.IntVar(&(cf.KVStorePort),
+ "kv-store-port",
+ cf.KVStorePort,
+ "KV store port")
+
+ flag.StringVar(&(cf.LogLevel),
+ "log-level",
+ cf.LogLevel,
+ "Log level")
+
+ flag.Parse()
+
+ // Update the necessary keys with the prefixes
+ start := time.Now()
+ containerName := getContainerInfo()
+ fmt.Println("container name:", containerName)
+ if len(containerName) > 0 {
+ cf.InstanceID = containerName
+ }
+
+ fmt.Println("Inside config:", cf)
+ elapsed := time.Since(start)
+ fmt.Println("time:", elapsed/time.Second)
+}
+
+func getContainerInfo() string {
+ return os.Getenv("HOSTNAME")
+}
diff --git a/rw_core/main.go b/rw_core/main.go
new file mode 100644
index 0000000..e766904
--- /dev/null
+++ b/rw_core/main.go
@@ -0,0 +1,173 @@
+package main
+
+import (
+ "fmt"
+ "os"
+ "os/signal"
+ "time"
+ "errors"
+ "strconv"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/rw_core/config"
+ "syscall"
+)
+
+type rwCore struct {
+ kvClient kvstore.Client
+ config *config.RWCoreFlags
+ halted bool
+ exitChannel chan int
+}
+
+func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
+
+ log.Infow("kv-store-type", log.Fields{"store":storeType})
+ switch storeType {
+ case "consul":
+ return kvstore.NewConsulClient(address, timeout)
+ case "etcd":
+ return kvstore.NewEtcdClient(address, timeout)
+ }
+ return nil, errors.New("unsupported-kv-store")
+}
+
+func newRWCore(cf *config.RWCoreFlags) *rwCore {
+ var rwCore rwCore
+ rwCore.config = cf
+ rwCore.halted = false
+ rwCore.exitChannel = make(chan int, 1)
+ return &rwCore
+}
+
+func (core *rwCore) setKVClient() error {
+ addr := core.config.KVStoreHost + ":" + strconv.Itoa(core.config.KVStorePort)
+ client, err := newKVClient(core.config.KVStoreType, addr, core.config.KVStoreTimeout)
+ if err != nil {
+ log.Error(err)
+ return err
+ }
+ core.kvClient = client
+ return nil
+}
+
+
+func toString(value interface{}) (string, error) {
+ switch t := value.(type) {
+ case []byte:
+ return string(value.([]byte)), nil
+ case string:
+ return value.(string), nil
+ default:
+ return "", fmt.Errorf("unexpected-type-%T", t)
+ }
+}
+
+
+func (core *rwCore) start() {
+ log.Info("core-starting")
+
+ //First set the KV client. Some client immediately tries to connect to the KV store (etcd) while others does
+ // not create the connection until a request to the store is made (consul)
+ tick := time.Tick(kvstore.GetDuration(core.config.KVStoreTimeout))
+ connected := false
+KVStoreConnectLoop:
+ for {
+ if err := core.setKVClient(); err != nil {
+ log.Warn("cannot-create-kv-client-retrying")
+ select {
+ case <-tick:
+ log.Debug("kv-client-retry")
+ continue
+ case <-core.exitChannel:
+ log.Info("exit-request-received")
+ break KVStoreConnectLoop
+ }
+ } else {
+ log.Debug("got-kv-client.")
+ connected = true
+ break
+ }
+ }
+ // Connected is true only if there is a valid KV store connection and no exit request has been received
+ if connected {
+ log.Info("core-started")
+ } else {
+ log.Info("core-ended")
+ }
+}
+
+
+func (core *rwCore) stop() {
+ // Stop leadership tracking
+ core.halted = true
+
+ // send exit signal
+ core.exitChannel <- 0
+
+ // Cleanup - applies only if we had a kvClient
+ if core.kvClient != nil {
+ // Release all reservations
+ if err := core.kvClient.ReleaseAllReservations(); err != nil {
+ log.Infow("fail-to-release-all-reservations", log.Fields{"error":err})
+ }
+ // Close the DB connection
+ core.kvClient.Close()
+ }
+}
+
+func waitForExit() int {
+ signalChannel := make(chan os.Signal, 1)
+ signal.Notify(signalChannel,
+ syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT)
+
+ exitChannel := make(chan int)
+
+ go func() {
+ s := <-signalChannel
+ switch s {
+ case syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT:
+ log.Infow("closing-signal-received", log.Fields{"signal":s})
+ exitChannel <- 0
+ default:
+ log.Infow("unexpected-signal-received", log.Fields{"signal":s})
+ exitChannel <- 1
+ }
+ }()
+
+ code := <-exitChannel
+ return code
+}
+
+func main() {
+ start := time.Now()
+
+ cf := config.NewRWCoreFlags()
+ cf.ParseCommandArguments()
+
+ // Setup logging
+ if _, err := log.SetLogger(log.JSON, log.DebugLevel, log.Fields{"instanceId":cf.InstanceID}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+ defer log.CleanUp()
+
+ log.Infow("rw-core-config", log.Fields{"config":*cf})
+
+ core := newRWCore(cf)
+ go core.start()
+
+ code := waitForExit()
+ log.Infow("received-a-closing-signal", log.Fields{"code":code})
+
+ // Cleanup before leaving
+ core.stop()
+
+ elapsed := time.Since(start)
+ log.Infow("rw-core-run-time", log.Fields{"core":core.config.InstanceID, "time":elapsed/time.Second})
+}