VOL-4019: Initial commit with grpc nbi, sbi, etcd, kafka and hw management rpcs.
Change-Id: I78feaf7da284028fc61f42c5e0c5f56e72fe9e78
diff --git a/pkg/db/connection.go b/pkg/db/connection.go
new file mode 100644
index 0000000..52cf88b
--- /dev/null
+++ b/pkg/db/connection.go
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package db holds utils for datastore implementation
+package db
+
+import (
+ "context"
+ "errors"
+ "time"
+
+ "github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
+
+ "github.com/opencord/voltha-lib-go/v4/pkg/probe"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/opencord/opendevice-manager/pkg/config"
+
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+)
+
+var kvClient *KvStoreClient
+
+// KvStoreClient holds the KVStore info
+type KvStoreClient struct {
+ client kvstore.Client
+}
+
+// logger represents the log object
+var logger log.CLogger
+
+// init function for the package
+func init() {
+ logger = config.Initlog()
+}
+
+// NewKVClient function initialises and connects to KVStore
+func NewKVClient(ctx context.Context, storeType, address string, timeout time.Duration) (kvstore.Client, error) {
+ ctx1, cancel := context.WithTimeout(ctx, timeout)
+ logger.Infow(ctx, "kv-store-type", log.Fields{"store": config.KVStore})
+ switch storeType {
+ case "etcd":
+ etcdClient, err := kvstore.NewEtcdClient(ctx1, address, timeout, log.FatalLevel)
+ defer cancel()
+ if !etcdClient.IsConnectionUp(ctx1) {
+ logger.Errorw(ctx, "etcd-server-unreachable", log.Fields{"address": address})
+ return nil, errors.New("etcd client unreachable")
+ }
+ kvClient = new(KvStoreClient)
+ kvClient.client = etcdClient
+ return etcdClient, err
+ }
+ return nil, errors.New("unsupported-kv-store")
+}
+
+// StopKVClient function disconnects connects from KVStore
+func StopKVClient(ctx context.Context, kvClient kvstore.Client) {
+
+ // Release all reservations
+ if err := kvClient.ReleaseAllReservations(ctx); err != nil {
+ logger.Infow(ctx, "fail-to-release-all-reservations", log.Fields{"error": err})
+ }
+ // Close the DB connection
+ kvClient.Close(ctx)
+}
+
+// WaitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
+func WaitUntilKVStoreReachableOrMaxTries(ctx context.Context, kvClient kvstore.Client, maxRetries int, retryInterval time.Duration) error {
+ logger.Infow(ctx, "verifying-KV-store-connectivity", log.Fields{"retries": maxRetries, "retryInterval": retryInterval})
+ count := 0
+ for {
+ if !kvClient.IsConnectionUp(ctx) {
+ logger.Info(ctx, "KV-store-unreachable")
+ if maxRetries != -1 {
+ if count >= maxRetries {
+ return status.Error(codes.Unavailable, "kv store unreachable")
+ }
+ }
+ count++
+
+ // Take a nap before retrying
+ select {
+ case <-ctx.Done():
+ //ctx canceled
+ return ctx.Err()
+ case <-time.After(retryInterval):
+ }
+ logger.Infow(ctx, "retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+ } else {
+ break
+ }
+ }
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+ logger.Info(ctx, "KV-store-reachable")
+ return nil
+}
+
+/*
+ * Thread to monitor kvstore Liveness (connection status)
+ *
+ * This function constantly monitors Liveness State of kvstore as reported
+ * periodically by backend and updates the Status of kv-store service registered
+ * with rw_core probe.
+ *
+ * If no liveness event has been seen within a timeout, then the thread will
+ * perform a "liveness" check attempt, which will in turn trigger a liveness event on
+ * the liveness channel, true or false depending on whether the attempt succeeded.
+ *
+ * The gRPC server in turn monitors the state of the readiness probe and will
+ * start issuing UNAVAILABLE response while the probe is not ready.
+ */
+// MonitorKVStoreLiveness abc
+func MonitorKVStoreLiveness(ctx context.Context, backend *db.Backend, liveProbeInterval, notLiveProbeInterval time.Duration) {
+ logger.Info(ctx, "start-monitoring-kvstore-liveness")
+
+ // Instruct backend to create Liveness channel for transporting state updates
+ livenessChannel := backend.EnableLivenessChannel(ctx)
+
+ logger.Debug(ctx, "enabled-kvstore-liveness-channel")
+
+ // Default state for kvstore is alive for rw_core
+ timeout := liveProbeInterval
+loop:
+ for {
+ timeoutTimer := time.NewTimer(timeout)
+ select {
+
+ case liveness := <-livenessChannel:
+ logger.Debugw(ctx, "received-liveness-change-notification", log.Fields{"liveness": liveness})
+
+ if !liveness {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+ logger.Info(ctx, "kvstore-set-server-notready")
+
+ timeout = notLiveProbeInterval
+
+ } else {
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+ logger.Info(ctx, "kvstore-set-server-ready")
+
+ timeout = liveProbeInterval
+ }
+
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+
+ case <-ctx.Done():
+ break loop
+
+ case <-timeoutTimer.C:
+ logger.Info(ctx, "kvstore-perform-liveness-check-on-timeout")
+
+ // Trigger Liveness check if no liveness update received within the timeout period.
+ // The Liveness check will push Live state to same channel which this routine is
+ // reading and processing. This, do it asynchronously to avoid blocking for
+ // backend response and avoid any possibility of deadlock
+ go backend.PerformLivenessCheck(ctx)
+ }
+ }
+}
diff --git a/pkg/db/operations.go b/pkg/db/operations.go
new file mode 100644
index 0000000..ea63050
--- /dev/null
+++ b/pkg/db/operations.go
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package db holds utils for datastore implementation
+package db
+
+import (
+ "context"
+ "errors"
+
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+)
+
+// Read function reads key value pair from db/kvstore
+func Read(ctx context.Context, key string) (string, error) {
+ if kvClient != nil {
+ logger.Debugw(ctx, "Reading-key-value-pair-from-kv-store", log.Fields{"key": key})
+ kvPair, err := kvClient.client.Get(ctx, key)
+ if err != nil {
+ return "", err
+ }
+ if kvPair == nil {
+ return "", errors.New("key not found")
+ }
+ return string(kvPair.Value.([]byte)), nil
+
+ }
+ logger.Errorw(ctx, "Reading-key-value-pair-in kv-store-failed-because-kvstore-not-initialised", log.Fields{"key": key})
+ return "", errors.New("kvstore not initialised")
+}
+
+// ReadAll function reads all key value pair from db/kvstore
+func ReadAll(ctx context.Context, keyPrefix string) (map[string]string, error) {
+ keyValues := make(map[string]string)
+ if kvClient != nil {
+ logger.Debugw(ctx, "Reading-all-key-value-pairs-from-kv-store", log.Fields{"key-prefix": keyPrefix})
+ kvPairs, err := kvClient.client.List(ctx, keyPrefix)
+ if err != nil {
+ return keyValues, err
+ }
+ if kvPairs == nil {
+ return keyValues, errors.New("key not found")
+ }
+
+ for key, kvPair := range kvPairs {
+ keyValues[key] = string(kvPair.Value.([]byte))
+ }
+ return keyValues, nil
+ }
+ logger.Errorw(ctx, "Reading-all-key-value-pair-in-kv-store-failed-because-kvstore-not-initialised", log.Fields{"key-prefix": keyPrefix})
+ return keyValues, errors.New("kvstore not initialised")
+}
+
+// Del function deletes key value pair from db/kvstore
+func Del(ctx context.Context, key string) error {
+ if kvClient != nil {
+ logger.Debugw(ctx, "Deleting-key-value-pair-from-kv-store", log.Fields{"key": key})
+ return kvClient.client.Delete(ctx, key)
+ }
+ logger.Errorw(ctx, "Deleting-key-value-pair-in-kv-store-failed-because-kvstore-not-initialised", log.Fields{"key": key})
+ return errors.New("kvstore not initialised")
+}
+
+// DelAll function deletes all key value pair from db/kvstore with provided key prefix
+func DelAll(ctx context.Context, keyPrefix string) error {
+ if kvClient != nil {
+ logger.Debugw(ctx, "Deleting-all-key-value-pair-from-kv-store-with-prefix", log.Fields{"key-prefix": keyPrefix})
+ return kvClient.client.DeleteWithPrefix(ctx, keyPrefix)
+ }
+ logger.Errorw(ctx, "Deleting-all-key-value-pair-in-kv-store-with-prefix-failed-because-kvstore-not-initialised", log.Fields{"key-prefix": keyPrefix})
+ return errors.New("kvstore not initialised")
+}
+
+// Put function stores key value pair in db/kvstore
+func Put(ctx context.Context, key string, val string) error {
+ if kvClient != nil {
+ logger.Debugw(ctx, "Storing-key-value-pair-in-kv-store", log.Fields{"key": key, "value": val})
+ return kvClient.client.Put(ctx, key, val)
+ }
+ logger.Errorw(ctx, "Storing-key-value-pair-in-kv-store-failed-because-kvstore-not-initialised", log.Fields{"key": key, "value": val})
+ return errors.New("kvstore not initialised")
+}