Update the RW Core configuration to accept similar configuration as the
Twisted Python Voltha Core.
Change-Id: Ic9b497dd2b2160d76c941f5115e8e6d0271916e9
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index ec30eac..a223318 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -7,80 +7,140 @@
//dt "github.com/docker/docker/api/types"
//dc "github.com/docker/docker/client"
"os"
- "time"
)
-// Constants used to differentiate between the KV stores
+// RW Core service default constants
const (
- ConsulStoreName string = "consul"
- EtcdStoreName string = "etcd"
+ ConsulStoreName = "consul"
+ EtcdStoreName = "etcd"
+ default_InstanceID = "rwcore001"
+ default_GrpcPort = 50057
+ default_GrpcHost = "10.100.198.240"
+ default_KafkaAdapterHost = "10.100.198.240"
+ default_KafkaAdapterPort = 9092
+ default_KafkaClusterHost = "10.100.198.240"
+ default_KafkaClusterPort = 9094
+ default_KVStoreType = ConsulStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "10.100.198.240"
+ default_KVStorePort = 8500 // Etcd = 2379
+ default_LogLevel = 1
+ default_Banner = false
+ default_CoreTopic = "rwcore"
+ default_RWCoreEndpoint = "rwcore"
+ default_RWCoreKey = "pki/voltha.key"
+ default_RWCoreCert = "pki/voltha.crt"
+ default_RWCoreCA = "pki/voltha-CA.pem"
)
-// CoordinatorFlags represents the set of configurations used by the coordinator
+// RWCoreFlags represents the set of configurations used by the read-write core service
type RWCoreFlags struct {
// Command line parameters
- InstanceID string
- KVStoreType string
- KVStoreTimeout int // in seconds
- KVStoreHost string
- KVStorePort int
- LogLevel string
+ InstanceID string
+ RWCoreEndpoint string
+ GrpcHost string
+ GrpcPort int
+ KafkaAdapterHost string
+ KafkaAdapterPort int
+ KafkaClusterHost string
+ KafkaClusterPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ CoreTopic string
+ LogLevel int
+ Banner bool
+ RWCoreKey string
+ RWCoreCert string
+ RWCoreCA string
}
-// NewRWCoreFlags returns a new coordinator config
+// NewRWCoreFlags returns a new RWCore config
func NewRWCoreFlags() *RWCoreFlags {
var rwCoreFlag = RWCoreFlags{ // Default values
- InstanceID: "rw_coreInstance001",
- KVStoreType: ConsulStoreName,
- KVStoreTimeout: 5,
- KVStoreHost: "10.100.198.240",
- //KVStorePort: 2379,
- KVStorePort: 8500,
- LogLevel: "info",
+ InstanceID: default_InstanceID,
+ RWCoreEndpoint: default_RWCoreEndpoint,
+ GrpcHost: default_GrpcHost,
+ GrpcPort: default_GrpcPort,
+ KafkaAdapterHost: default_KafkaAdapterHost,
+ KafkaAdapterPort: default_KafkaAdapterPort,
+ KafkaClusterHost: default_KafkaClusterHost,
+ KafkaClusterPort: default_KafkaClusterPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ Banner: default_Banner,
+ RWCoreKey: default_RWCoreKey,
+ RWCoreCert: default_RWCoreCert,
+ RWCoreCA: default_RWCoreCA,
}
return &rwCoreFlag
}
-// ParseCommandArguments parses the arguments when running coordinator
+// ParseCommandArguments parses the arguments when running read-write core service
func (cf *RWCoreFlags) ParseCommandArguments() {
- flag.IntVar(&(cf.KVStoreTimeout),
- "kv-store-request-timeout",
- cf.KVStoreTimeout,
- "The default timeout when making a kv store request")
- flag.StringVar(&(cf.KVStoreType),
- "kv-store-type",
- cf.KVStoreType,
- "KV store type")
+ var help string
- flag.StringVar(&(cf.KVStoreHost),
- "kv-store-host",
- cf.KVStoreHost,
- "KV store host")
+ help = fmt.Sprintf("RW core endpoint address")
+ flag.StringVar(&(cf.RWCoreEndpoint), "vcore-endpoint", default_RWCoreEndpoint, help)
- flag.IntVar(&(cf.KVStorePort),
- "kv-store-port",
- cf.KVStorePort,
- "KV store port")
+ help = fmt.Sprintf("GRPC server - host")
+ flag.StringVar(&(cf.GrpcHost), "grpc_host", default_GrpcHost, help)
- flag.StringVar(&(cf.LogLevel),
- "log-level",
- cf.LogLevel,
- "Log level")
+ help = fmt.Sprintf("GRPC server - port")
+ flag.IntVar(&(cf.GrpcPort), "grpc_port", default_GrpcPort, help)
+
+ help = fmt.Sprintf("Kafka - Adapter messaging host")
+ flag.StringVar(&(cf.KafkaAdapterHost), "kafka_adapter_host", default_KafkaAdapterHost, help)
+
+ help = fmt.Sprintf("Kafka - Adapter messaging port")
+ flag.IntVar(&(cf.KafkaAdapterPort), "kafka_adapter_port", default_KafkaAdapterPort, help)
+
+ help = fmt.Sprintf("Kafka - Cluster messaging host")
+ flag.StringVar(&(cf.KafkaClusterHost), "kafka_cluster_host", default_KafkaClusterHost, help)
+
+ help = fmt.Sprintf("Kafka - Cluster messaging port")
+ flag.IntVar(&(cf.KafkaClusterPort), "kafka_cluster_port", default_KafkaClusterPort, help)
+
+ help = fmt.Sprintf("RW Core topic")
+ flag.StringVar(&(cf.CoreTopic), "rw_core_topic", default_CoreTopic, help)
+
+ help = fmt.Sprintf("KV store type")
+ flag.StringVar(&(cf.KVStoreType), "kv_store_type", default_KVStoreType, help)
+
+ help = fmt.Sprintf("The default timeout when making a kv store request")
+ flag.IntVar(&(cf.KVStoreTimeout), "kv_store_request_timeout", default_KVStoreTimeout, help)
+
+ help = fmt.Sprintf("KV store host")
+ flag.StringVar(&(cf.KVStoreHost), "kv_store_host", default_KVStoreHost, help)
+
+ help = fmt.Sprintf("KV store port")
+ flag.IntVar(&(cf.KVStorePort), "kv_store_port", default_KVStorePort, help)
+
+ help = fmt.Sprintf("Log level")
+ flag.IntVar(&(cf.LogLevel), "log_level", default_LogLevel, help)
+
+ help = fmt.Sprintf("Show startup banner log lines")
+ flag.BoolVar(&cf.Banner, "banner", default_Banner, help)
flag.Parse()
// Update the necessary keys with the prefixes
- start := time.Now()
+ //start := time.Now()
containerName := getContainerInfo()
- fmt.Println("container name:", containerName)
+ //fmt.Println("container name:", containerName)
if len(containerName) > 0 {
cf.InstanceID = containerName
}
- fmt.Println("Inside config:", cf)
- elapsed := time.Since(start)
- fmt.Println("time:", elapsed/time.Second)
+ //fmt.Println("Inside config:", cf)
+ //elapsed := time.Since(start)
+ //fmt.Println("time:", elapsed/time.Second)
}
func getContainerInfo() string {
diff --git a/rw_core/main.go b/rw_core/main.go
index e766904..faa5851 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -1,28 +1,29 @@
package main
import (
+ "context"
+ "errors"
"fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/rw_core/config"
"os"
"os/signal"
- "time"
- "errors"
"strconv"
- "github.com/opencord/voltha-go/db/kvstore"
- "github.com/opencord/voltha-go/common/log"
- "github.com/opencord/voltha-go/rw_core/config"
"syscall"
+ "time"
)
type rwCore struct {
- kvClient kvstore.Client
- config *config.RWCoreFlags
- halted bool
- exitChannel chan int
+ kvClient kvstore.Client
+ config *config.RWCoreFlags
+ halted bool
+ exitChannel chan int
}
func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
- log.Infow("kv-store-type", log.Fields{"store":storeType})
+ log.Infow("kv-store-type", log.Fields{"store": storeType})
switch storeType {
case "consul":
return kvstore.NewConsulClient(address, timeout)
@@ -51,7 +52,6 @@
return nil
}
-
func toString(value interface{}) (string, error) {
switch t := value.(type) {
case []byte:
@@ -63,41 +63,16 @@
}
}
+func (core *rwCore) start(ctx context.Context) {
+ log.Info("Starting RW Core components")
+ // Setup GRPC Server
-func (core *rwCore) start() {
- log.Info("core-starting")
+ // Setup KV Client
- //First set the KV client. Some client immediately tries to connect to the KV store (etcd) while others does
- // not create the connection until a request to the store is made (consul)
- tick := time.Tick(kvstore.GetDuration(core.config.KVStoreTimeout))
- connected := false
-KVStoreConnectLoop:
- for {
- if err := core.setKVClient(); err != nil {
- log.Warn("cannot-create-kv-client-retrying")
- select {
- case <-tick:
- log.Debug("kv-client-retry")
- continue
- case <-core.exitChannel:
- log.Info("exit-request-received")
- break KVStoreConnectLoop
- }
- } else {
- log.Debug("got-kv-client.")
- connected = true
- break
- }
- }
- // Connected is true only if there is a valid KV store connection and no exit request has been received
- if connected {
- log.Info("core-started")
- } else {
- log.Info("core-ended")
- }
+ // Setup Kafka messaging services
+
}
-
func (core *rwCore) stop() {
// Stop leadership tracking
core.halted = true
@@ -109,7 +84,7 @@
if core.kvClient != nil {
// Release all reservations
if err := core.kvClient.ReleaseAllReservations(); err != nil {
- log.Infow("fail-to-release-all-reservations", log.Fields{"error":err})
+ log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
}
// Close the DB connection
core.kvClient.Close()
@@ -133,10 +108,10 @@
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT:
- log.Infow("closing-signal-received", log.Fields{"signal":s})
+ log.Infow("closing-signal-received", log.Fields{"signal": s})
exitChannel <- 0
default:
- log.Infow("unexpected-signal-received", log.Fields{"signal":s})
+ log.Infow("unexpected-signal-received", log.Fields{"signal": s})
exitChannel <- 1
}
}()
@@ -145,6 +120,16 @@
return code
}
+func printBanner() {
+ fmt.Println(" ")
+ fmt.Println(" ______ ______ ")
+ fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
+ fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
+ fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
+ fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
+ fmt.Println(" ")
+}
+
func main() {
start := time.Now()
@@ -152,22 +137,30 @@
cf.ParseCommandArguments()
// Setup logging
- if _, err := log.SetLogger(log.JSON, log.DebugLevel, log.Fields{"instanceId":cf.InstanceID}); err != nil {
+ if _, err := log.SetLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
}
defer log.CleanUp()
- log.Infow("rw-core-config", log.Fields{"config":*cf})
+ // Print banner if specified
+ if cf.Banner {
+ printBanner()
+ }
+
+ log.Infow("rw-core-config", log.Fields{"config": *cf})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
core := newRWCore(cf)
- go core.start()
+ go core.start(ctx)
code := waitForExit()
- log.Infow("received-a-closing-signal", log.Fields{"code":code})
+ log.Infow("received-a-closing-signal", log.Fields{"code": code})
// Cleanup before leaving
core.stop()
elapsed := time.Since(start)
- log.Infow("rw-core-run-time", log.Fields{"core":core.config.InstanceID, "time":elapsed/time.Second})
+ log.Infow("rw-core-run-time", log.Fields{"core": core.config.InstanceID, "time": elapsed / time.Second})
}