Update the RW Core configuration to accept similar configuration as the
Twisted Python Voltha Core.
Change-Id: Ic9b497dd2b2160d76c941f5115e8e6d0271916e9
diff --git a/common/log/log.go b/common/log/log.go
index 4bb31c0..e9f3c1b 100644
--- a/common/log/log.go
+++ b/common/log/log.go
@@ -2,11 +2,11 @@
import (
"errors"
+ "fmt"
zp "go.uber.org/zap"
zc "go.uber.org/zap/zapcore"
"runtime"
"strings"
- "fmt"
)
const (
@@ -26,6 +26,7 @@
// CONSOLE formats the log for the console, mostly used during development
const CONSOLE = "console"
+
// JSON formats the log using json format, mostly used by an automated logging system consumption
const JSON = "json"
diff --git a/compose/rw_core.yml b/compose/rw_core.yml
index 0fde367..bd8ea9f 100644
--- a/compose/rw_core.yml
+++ b/compose/rw_core.yml
@@ -4,9 +4,10 @@
image: voltha-rw-core
entrypoint:
- /app/rw_core
- - -kv-store-type=etcd
- - -kv-store-host=${DOCKER_HOST_IP}
- - -kv-store-port=2379
+ - -kv_store_type=etcd
+ - -kv_store_host=${DOCKER_HOST_IP}
+ - -kv_store_port=2379
+ - -banner=true
volumes:
- "/var/run/docker.sock:/tmp/docker.sock"
networks:
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index 453f282..911da58 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -1,12 +1,12 @@
package kvstore
import (
+ "bytes"
"context"
"errors"
- "bytes"
+ log "github.com/opencord/voltha-go/common/log"
"sync"
"time"
- log "github.com/opencord/voltha-go/common/log"
//log "ciena.com/coordinator/common"
consulapi "github.com/hashicorp/consul/api"
)
@@ -17,7 +17,6 @@
cancel context.CancelFunc
}
-
// ConsulClient represents the consul KV store client
type ConsulClient struct {
session *consulapi.Session
@@ -140,7 +139,7 @@
session := c.consul.Session()
_, err := session.Destroy(c.sessionID, nil)
if err != nil {
- log.Errorw("error-cleaning-session", log.Fields{"session":c.sessionID, "error":err})
+ log.Errorw("error-cleaning-session", log.Fields{"session": c.sessionID, "error": err})
}
}
c.sessionID = ""
@@ -157,12 +156,12 @@
for {
id, meta, err := session.Create(entry, nil)
if err != nil {
- log.Errorw("create-session-error", log.Fields{"error":err})
+ log.Errorw("create-session-error", log.Fields{"error": err})
if retries == 0 {
return nil, "", err
}
} else if meta.RequestTime == 0 {
- log.Errorw("create-session-bad-meta-data", log.Fields{"meta-data":meta})
+ log.Errorw("create-session-bad-meta-data", log.Fields{"meta-data": meta})
if retries == 0 {
return nil, "", errors.New("bad-meta-data")
}
@@ -183,7 +182,6 @@
}
}
-
// Helper function to verify mostly whether the content of two interface types are the same. Focus is []byte and
// string types
func isEqual(val1 interface{}, val2 interface{}) bool {
@@ -226,10 +224,10 @@
session, sessionID, err := c.createSession(ttl, -1)
if err != nil {
- log.Errorw("no-session-created", log.Fields{"error":err})
+ log.Errorw("no-session-created", log.Fields{"error": err})
return "", errors.New("no-session-created")
}
- log.Debugw("session-created", log.Fields{"session-id":sessionID})
+ log.Debugw("session-created", log.Fields{"session-id": sessionID})
c.sessionID = sessionID
c.session = session
@@ -238,11 +236,11 @@
kvp := consulapi.KVPair{Key: key, Value: val, Session: c.sessionID}
result, _, err := kv.Acquire(&kvp, nil)
if err != nil {
- log.Errorw("error-acquiring-keys", log.Fields{"error":err})
+ log.Errorw("error-acquiring-keys", log.Fields{"error": err})
return nil, err
}
- log.Debugw("key-acquired", log.Fields{"key":key, "status":result})
+ log.Debugw("key-acquired", log.Fields{"key": key, "status": result})
// Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
m, err := c.Get(key, defaultKVGetTimeout)
@@ -250,7 +248,7 @@
return nil, err
}
if m != nil {
- log.Debugw("response-received", log.Fields{"key":m.Key, "m.value":string(m.Value.([]byte)), "value":value})
+ log.Debugw("response-received", log.Fields{"key": m.Key, "m.value": string(m.Value.([]byte)), "value": value})
if m.Key == key && isEqual(m.Value, value) {
// My reservation is successful - register it. For now, support is only for 1 reservation per key
// per session.
@@ -280,11 +278,11 @@
kvp = consulapi.KVPair{Key: key, Value: value.([]byte), Session: c.sessionID}
result, _, err = kv.Release(&kvp, nil)
if err != nil {
- log.Errorw("cannot-release-reservation", log.Fields{"key":key, "error":err})
+ log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
return err
}
if !result {
- log.Errorw("cannot-release-reservation", log.Fields{"key":key})
+ log.Errorw("cannot-release-reservation", log.Fields{"key": key})
}
delete(c.keyReservations, key)
}
@@ -371,7 +369,7 @@
c.writeLock.Lock()
defer c.writeLock.Unlock()
if watchedChannelsContexts, ok = c.watchedChannelsContext[key]; !ok {
- log.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key":key})
+ log.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key": key})
return
}
// Look for the channels
@@ -390,7 +388,7 @@
if pos >= 0 {
c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key][:pos], c.watchedChannelsContext[key][pos+1:]...)
}
- log.Debugw("watched-channel-exiting", log.Fields{"key":key, "channel":c.watchedChannelsContext[key]})
+ log.Debugw("watched-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannelsContext[key]})
}
func (c *ConsulClient) isKVEqual(kv1 *consulapi.KVPair, kv2 *consulapi.KVPair) bool {
@@ -411,7 +409,7 @@
}
func (c *ConsulClient) listenForKeyChange(watchContext context.Context, key string, ch chan *Event) {
- log.Debugw("start-watching-channel", log.Fields{"key":key, "channel":ch})
+ log.Debugw("start-watching-channel", log.Fields{"key": key, "channel": ch})
defer c.CloseWatch(key, ch)
duration := GetDuration(defaultKVGetTimeout)
@@ -441,10 +439,10 @@
return
default:
if err != nil {
- log.Warnw("error-from-watch", log.Fields{"error":err})
+ log.Warnw("error-from-watch", log.Fields{"error": err})
ch <- NewEvent(CONNECTIONDOWN, key, []byte(""))
} else {
- log.Debugw("index-state", log.Fields{"lastindex":lastIndex, "newindex":meta.LastIndex, "key":key})
+ log.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
}
}
if err != nil {
@@ -454,13 +452,13 @@
} else if meta.LastIndex <= lastIndex {
log.Info("no-index-change-or-negative")
} else {
- log.Debugw("update-received", log.Fields{"pair":pair})
+ log.Debugw("update-received", log.Fields{"pair": pair})
if pair == nil {
ch <- NewEvent(DELETE, key, []byte(""))
} else if !c.isKVEqual(pair, previousKVPair) {
// Push the change onto the channel if the data has changed
// For now just assume it's a PUT change
- log.Debugw("pair-details", log.Fields{"session":pair.Session, "key":pair.Key, "value":pair.Value})
+ log.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
ch <- NewEvent(PUT, pair.Key, pair.Value)
}
previousKVPair = pair
@@ -481,6 +479,6 @@
// Clear the sessionID
if _, err := c.consul.Session().Destroy(c.sessionID, &writeOptions); err != nil {
- log.Errorw("error-closing-client", log.Fields{"error":err})
+ log.Errorw("error-closing-client", log.Fields{"error": err})
}
}
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 2755cd1..becc86b 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -4,11 +4,11 @@
//log "../common"
"context"
"errors"
- "sync"
- log "github.com/opencord/voltha-go/common/log"
"fmt"
v3Client "github.com/coreos/etcd/clientv3"
v3rpcTypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ log "github.com/opencord/voltha-go/common/log"
+ "sync"
)
// EtcdClient represents the Etcd KV store client
@@ -98,13 +98,13 @@
if err != nil {
switch err {
case context.Canceled:
- log.Warnw("context-cancelled", log.Fields{"error":err})
+ log.Warnw("context-cancelled", log.Fields{"error": err})
case context.DeadlineExceeded:
- log.Warnw("context-deadline-exceeded", log.Fields{"error":err})
+ log.Warnw("context-deadline-exceeded", log.Fields{"error": err})
case v3rpcTypes.ErrEmptyKey:
- log.Warnw("etcd-client-error", log.Fields{"error":err})
+ log.Warnw("etcd-client-error", log.Fields{"error": err})
default:
- log.Warnw("bad-endpoints", log.Fields{"error":err})
+ log.Warnw("bad-endpoints", log.Fields{"error": err})
}
return err
}
@@ -142,7 +142,7 @@
return nil
}
- log.Debugw("delete-keys", log.Fields{"all-keys-deleted":int64(len(gresp.Kvs)) == dresp.Deleted})
+ log.Debugw("delete-keys", log.Fields{"all-keys-deleted": int64(len(gresp.Kvs)) == dresp.Deleted})
if int64(len(gresp.Kvs)) == dresp.Deleted {
log.Debug("All-keys-deleted")
} else {
@@ -236,7 +236,7 @@
for key, leaseID := range c.keyReservations {
_, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
if err != nil {
- log.Errorw("cannot-release-reservation", log.Fields{"key":key, "error":err})
+ log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
return err
}
delete(c.keyReservations, key)
@@ -280,7 +280,7 @@
if leaseID != nil {
_, err := c.ectdAPI.KeepAliveOnce(context.Background(), *leaseID)
if err != nil {
- log.Errorw("lease-may-have-expired", log.Fields{"error":err})
+ log.Errorw("lease-may-have-expired", log.Fields{"error": err})
return err
}
} else {
@@ -305,7 +305,7 @@
//defer c.writeLock.Unlock()
c.watchedChannels[key] = append(c.watchedChannels[key], channelMap)
- log.Debugw("watched-channels", log.Fields{"channels":c.watchedChannels[key]})
+ log.Debugw("watched-channels", log.Fields{"channels": c.watchedChannels[key]})
// Launch a go routine to listen for updates
go c.listenForKeyChange(channel, ch)
@@ -323,7 +323,7 @@
defer c.writeLock.Unlock()
if watchedChannels, ok = c.watchedChannels[key]; !ok {
- log.Warnw("key-has-no-watched-channels", log.Fields{"key":key})
+ log.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
return
}
// Look for the channels
@@ -333,7 +333,7 @@
log.Debug("channel-found")
// Close the etcd watcher before the client channel. This should close the etcd channel as well
if err := t.Close(); err != nil {
- log.Errorw("watcher-cannot-be-closed", log.Fields{"key":key, "error":err})
+ log.Errorw("watcher-cannot-be-closed", log.Fields{"key": key, "error": err})
}
close(ch)
pos = i
@@ -344,11 +344,11 @@
if pos >= 0 {
c.watchedChannels[key] = append(c.watchedChannels[key][:pos], c.watchedChannels[key][pos+1:]...)
}
- log.Infow("watcher-channel-exiting", log.Fields{"key":key, "channel":c.watchedChannels[key]})
+ log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannels[key]})
}
func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
- log.Infow("start-listening-on-channel", log.Fields{"channel":ch})
+ log.Infow("start-listening-on-channel", log.Fields{"channel": ch})
for resp := range channel {
for _, ev := range resp.Events {
//log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
@@ -373,6 +373,6 @@
c.writeLock.Lock()
defer c.writeLock.Unlock()
if err := c.ectdAPI.Close(); err != nil {
- log.Errorw("error-closing-client", log.Fields{"error":err})
+ log.Errorw("error-closing-client", log.Fields{"error": err})
}
}
diff --git a/db/kvstore/kvutils_test.go b/db/kvstore/kvutils_test.go
index f5e82d2..2f3818d 100644
--- a/db/kvstore/kvutils_test.go
+++ b/db/kvstore/kvutils_test.go
@@ -1,9 +1,9 @@
package kvstore
import (
- "time"
- "testing"
"github.com/stretchr/testify/assert"
+ "testing"
+ "time"
)
func TestDurationWithNegativeTimeout(t *testing.T) {
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index ec30eac..a223318 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -7,80 +7,140 @@
//dt "github.com/docker/docker/api/types"
//dc "github.com/docker/docker/client"
"os"
- "time"
)
-// Constants used to differentiate between the KV stores
+// RW Core service default constants
const (
- ConsulStoreName string = "consul"
- EtcdStoreName string = "etcd"
+ ConsulStoreName = "consul"
+ EtcdStoreName = "etcd"
+ default_InstanceID = "rwcore001"
+ default_GrpcPort = 50057
+ default_GrpcHost = "10.100.198.240"
+ default_KafkaAdapterHost = "10.100.198.240"
+ default_KafkaAdapterPort = 9092
+ default_KafkaClusterHost = "10.100.198.240"
+ default_KafkaClusterPort = 9094
+ default_KVStoreType = ConsulStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "10.100.198.240"
+ default_KVStorePort = 8500 // Etcd = 2379
+ default_LogLevel = 1
+ default_Banner = false
+ default_CoreTopic = "rwcore"
+ default_RWCoreEndpoint = "rwcore"
+ default_RWCoreKey = "pki/voltha.key"
+ default_RWCoreCert = "pki/voltha.crt"
+ default_RWCoreCA = "pki/voltha-CA.pem"
)
-// CoordinatorFlags represents the set of configurations used by the coordinator
+// RWCoreFlags represents the set of configurations used by the read-write core service
type RWCoreFlags struct {
// Command line parameters
- InstanceID string
- KVStoreType string
- KVStoreTimeout int // in seconds
- KVStoreHost string
- KVStorePort int
- LogLevel string
+ InstanceID string
+ RWCoreEndpoint string
+ GrpcHost string
+ GrpcPort int
+ KafkaAdapterHost string
+ KafkaAdapterPort int
+ KafkaClusterHost string
+ KafkaClusterPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ CoreTopic string
+ LogLevel int
+ Banner bool
+ RWCoreKey string
+ RWCoreCert string
+ RWCoreCA string
}
-// NewRWCoreFlags returns a new coordinator config
+// NewRWCoreFlags returns a new RWCore config
func NewRWCoreFlags() *RWCoreFlags {
var rwCoreFlag = RWCoreFlags{ // Default values
- InstanceID: "rw_coreInstance001",
- KVStoreType: ConsulStoreName,
- KVStoreTimeout: 5,
- KVStoreHost: "10.100.198.240",
- //KVStorePort: 2379,
- KVStorePort: 8500,
- LogLevel: "info",
+ InstanceID: default_InstanceID,
+ RWCoreEndpoint: default_RWCoreEndpoint,
+ GrpcHost: default_GrpcHost,
+ GrpcPort: default_GrpcPort,
+ KafkaAdapterHost: default_KafkaAdapterHost,
+ KafkaAdapterPort: default_KafkaAdapterPort,
+ KafkaClusterHost: default_KafkaClusterHost,
+ KafkaClusterPort: default_KafkaClusterPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ Banner: default_Banner,
+ RWCoreKey: default_RWCoreKey,
+ RWCoreCert: default_RWCoreCert,
+ RWCoreCA: default_RWCoreCA,
}
return &rwCoreFlag
}
-// ParseCommandArguments parses the arguments when running coordinator
+// ParseCommandArguments parses the arguments when running read-write core service
func (cf *RWCoreFlags) ParseCommandArguments() {
- flag.IntVar(&(cf.KVStoreTimeout),
- "kv-store-request-timeout",
- cf.KVStoreTimeout,
- "The default timeout when making a kv store request")
- flag.StringVar(&(cf.KVStoreType),
- "kv-store-type",
- cf.KVStoreType,
- "KV store type")
+ var help string
- flag.StringVar(&(cf.KVStoreHost),
- "kv-store-host",
- cf.KVStoreHost,
- "KV store host")
+ help = fmt.Sprintf("RW core endpoint address")
+ flag.StringVar(&(cf.RWCoreEndpoint), "vcore-endpoint", default_RWCoreEndpoint, help)
- flag.IntVar(&(cf.KVStorePort),
- "kv-store-port",
- cf.KVStorePort,
- "KV store port")
+ help = fmt.Sprintf("GRPC server - host")
+ flag.StringVar(&(cf.GrpcHost), "grpc_host", default_GrpcHost, help)
- flag.StringVar(&(cf.LogLevel),
- "log-level",
- cf.LogLevel,
- "Log level")
+ help = fmt.Sprintf("GRPC server - port")
+ flag.IntVar(&(cf.GrpcPort), "grpc_port", default_GrpcPort, help)
+
+ help = fmt.Sprintf("Kafka - Adapter messaging host")
+ flag.StringVar(&(cf.KafkaAdapterHost), "kafka_adapter_host", default_KafkaAdapterHost, help)
+
+ help = fmt.Sprintf("Kafka - Adapter messaging port")
+ flag.IntVar(&(cf.KafkaAdapterPort), "kafka_adapter_port", default_KafkaAdapterPort, help)
+
+ help = fmt.Sprintf("Kafka - Cluster messaging host")
+ flag.StringVar(&(cf.KafkaClusterHost), "kafka_cluster_host", default_KafkaClusterHost, help)
+
+ help = fmt.Sprintf("Kafka - Cluster messaging port")
+ flag.IntVar(&(cf.KafkaClusterPort), "kafka_cluster_port", default_KafkaClusterPort, help)
+
+ help = fmt.Sprintf("RW Core topic")
+ flag.StringVar(&(cf.CoreTopic), "rw_core_topic", default_CoreTopic, help)
+
+ help = fmt.Sprintf("KV store type")
+ flag.StringVar(&(cf.KVStoreType), "kv_store_type", default_KVStoreType, help)
+
+ help = fmt.Sprintf("The default timeout when making a kv store request")
+ flag.IntVar(&(cf.KVStoreTimeout), "kv_store_request_timeout", default_KVStoreTimeout, help)
+
+ help = fmt.Sprintf("KV store host")
+ flag.StringVar(&(cf.KVStoreHost), "kv_store_host", default_KVStoreHost, help)
+
+ help = fmt.Sprintf("KV store port")
+ flag.IntVar(&(cf.KVStorePort), "kv_store_port", default_KVStorePort, help)
+
+ help = fmt.Sprintf("Log level")
+ flag.IntVar(&(cf.LogLevel), "log_level", default_LogLevel, help)
+
+ help = fmt.Sprintf("Show startup banner log lines")
+ flag.BoolVar(&cf.Banner, "banner", default_Banner, help)
flag.Parse()
// Update the necessary keys with the prefixes
- start := time.Now()
+ //start := time.Now()
containerName := getContainerInfo()
- fmt.Println("container name:", containerName)
+ //fmt.Println("container name:", containerName)
if len(containerName) > 0 {
cf.InstanceID = containerName
}
- fmt.Println("Inside config:", cf)
- elapsed := time.Since(start)
- fmt.Println("time:", elapsed/time.Second)
+ //fmt.Println("Inside config:", cf)
+ //elapsed := time.Since(start)
+ //fmt.Println("time:", elapsed/time.Second)
}
func getContainerInfo() string {
diff --git a/rw_core/main.go b/rw_core/main.go
index e766904..faa5851 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -1,28 +1,29 @@
package main
import (
+ "context"
+ "errors"
"fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/rw_core/config"
"os"
"os/signal"
- "time"
- "errors"
"strconv"
- "github.com/opencord/voltha-go/db/kvstore"
- "github.com/opencord/voltha-go/common/log"
- "github.com/opencord/voltha-go/rw_core/config"
"syscall"
+ "time"
)
type rwCore struct {
- kvClient kvstore.Client
- config *config.RWCoreFlags
- halted bool
- exitChannel chan int
+ kvClient kvstore.Client
+ config *config.RWCoreFlags
+ halted bool
+ exitChannel chan int
}
func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
- log.Infow("kv-store-type", log.Fields{"store":storeType})
+ log.Infow("kv-store-type", log.Fields{"store": storeType})
switch storeType {
case "consul":
return kvstore.NewConsulClient(address, timeout)
@@ -51,7 +52,6 @@
return nil
}
-
func toString(value interface{}) (string, error) {
switch t := value.(type) {
case []byte:
@@ -63,41 +63,16 @@
}
}
+func (core *rwCore) start(ctx context.Context) {
+ log.Info("Starting RW Core components")
+ // Setup GRPC Server
-func (core *rwCore) start() {
- log.Info("core-starting")
+ // Setup KV Client
- //First set the KV client. Some client immediately tries to connect to the KV store (etcd) while others does
- // not create the connection until a request to the store is made (consul)
- tick := time.Tick(kvstore.GetDuration(core.config.KVStoreTimeout))
- connected := false
-KVStoreConnectLoop:
- for {
- if err := core.setKVClient(); err != nil {
- log.Warn("cannot-create-kv-client-retrying")
- select {
- case <-tick:
- log.Debug("kv-client-retry")
- continue
- case <-core.exitChannel:
- log.Info("exit-request-received")
- break KVStoreConnectLoop
- }
- } else {
- log.Debug("got-kv-client.")
- connected = true
- break
- }
- }
- // Connected is true only if there is a valid KV store connection and no exit request has been received
- if connected {
- log.Info("core-started")
- } else {
- log.Info("core-ended")
- }
+ // Setup Kafka messaging services
+
}
-
func (core *rwCore) stop() {
// Stop leadership tracking
core.halted = true
@@ -109,7 +84,7 @@
if core.kvClient != nil {
// Release all reservations
if err := core.kvClient.ReleaseAllReservations(); err != nil {
- log.Infow("fail-to-release-all-reservations", log.Fields{"error":err})
+ log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
}
// Close the DB connection
core.kvClient.Close()
@@ -133,10 +108,10 @@
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT:
- log.Infow("closing-signal-received", log.Fields{"signal":s})
+ log.Infow("closing-signal-received", log.Fields{"signal": s})
exitChannel <- 0
default:
- log.Infow("unexpected-signal-received", log.Fields{"signal":s})
+ log.Infow("unexpected-signal-received", log.Fields{"signal": s})
exitChannel <- 1
}
}()
@@ -145,6 +120,16 @@
return code
}
+func printBanner() {
+ fmt.Println(" ")
+ fmt.Println(" ______ ______ ")
+ fmt.Println("| _ \\ \\ / / ___|___ _ __ ___ ")
+ fmt.Println("| |_) \\ \\ /\\ / / | / _ \\| '__/ _ \\ ")
+ fmt.Println("| _ < \\ V V /| |__| (_) | | | __/ ")
+ fmt.Println("|_| \\_\\ \\_/\\_/ \\____\\___/|_| \\___| ")
+ fmt.Println(" ")
+}
+
func main() {
start := time.Now()
@@ -152,22 +137,30 @@
cf.ParseCommandArguments()
// Setup logging
- if _, err := log.SetLogger(log.JSON, log.DebugLevel, log.Fields{"instanceId":cf.InstanceID}); err != nil {
+ if _, err := log.SetLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
}
defer log.CleanUp()
- log.Infow("rw-core-config", log.Fields{"config":*cf})
+ // Print banner if specified
+ if cf.Banner {
+ printBanner()
+ }
+
+ log.Infow("rw-core-config", log.Fields{"config": *cf})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
core := newRWCore(cf)
- go core.start()
+ go core.start(ctx)
code := waitForExit()
- log.Infow("received-a-closing-signal", log.Fields{"code":code})
+ log.Infow("received-a-closing-signal", log.Fields{"code": code})
// Cleanup before leaving
core.stop()
elapsed := time.Since(start)
- log.Infow("rw-core-run-time", log.Fields{"core":core.config.InstanceID, "time":elapsed/time.Second})
+ log.Infow("rw-core-run-time", log.Fields{"core": core.config.InstanceID, "time": elapsed / time.Second})
}