/* | |
* Copyright 2018-present Open Networking Foundation | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package main | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"github.com/opencord/voltha-go/adapters" | |
com "github.com/opencord/voltha-go/adapters/common" | |
ac "github.com/opencord/voltha-go/adapters/openolt/adaptercore" | |
"github.com/opencord/voltha-go/adapters/openolt/config" | |
"github.com/opencord/voltha-go/common/log" | |
"github.com/opencord/voltha-go/db/kvstore" | |
"github.com/opencord/voltha-go/kafka" | |
ic "github.com/opencord/voltha-go/protos/inter_container" | |
"github.com/opencord/voltha-go/protos/voltha" | |
"os" | |
"os/signal" | |
"strconv" | |
"syscall" | |
"time" | |
) | |
type adapter struct { | |
instanceId string | |
config *config.AdapterFlags | |
iAdapter adapters.IAdapter | |
kafkaClient kafka.Client | |
kvClient kvstore.Client | |
kip *kafka.InterContainerProxy | |
coreProxy *com.CoreProxy | |
halted bool | |
exitChannel chan int | |
receiverChannels []<-chan *ic.InterContainerMessage | |
} | |
func init() { | |
log.AddPackage(log.JSON, log.DebugLevel, nil) | |
} | |
func newAdapter(cf *config.AdapterFlags) *adapter { | |
var a adapter | |
a.instanceId = cf.InstanceID | |
a.config = cf | |
a.halted = false | |
a.exitChannel = make(chan int, 1) | |
a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0) | |
return &a | |
} | |
func (a *adapter) start(ctx context.Context) { | |
log.Info("Starting Core Adapter components") | |
var err error | |
// Setup KV Client | |
log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType}) | |
if err := a.setKVClient(); err != nil { | |
log.Fatal("error-setting-kv-client") | |
} | |
// Setup Kafka Client | |
if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil { | |
log.Fatal("Unsupported-common-client") | |
} | |
// Start the common InterContainer Proxy - retries indefinitely | |
if a.kip, err = a.startInterContainerProxy(-1); err != nil { | |
log.Fatal("error-starting-inter-container-proxy") | |
} | |
// Create the core proxy to handle requests to the Core | |
a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic) | |
// Create the open OLT adapter | |
if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.config.OnuNumber); err != nil { | |
log.Fatal("error-starting-inter-container-proxy") | |
} | |
// Register the core request handler | |
if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil { | |
log.Fatal("error-setting-core-request-handler") | |
} | |
// Register this adapter to the Core - retries indefinitely | |
if err = a.registerWithCore(-1); err != nil { | |
log.Fatal("error-registering-with-core") | |
} | |
} | |
func (rw *adapter) stop() { | |
// Stop leadership tracking | |
rw.halted = true | |
// send exit signal | |
rw.exitChannel <- 0 | |
// Cleanup - applies only if we had a kvClient | |
if rw.kvClient != nil { | |
// Release all reservations | |
if err := rw.kvClient.ReleaseAllReservations(); err != nil { | |
log.Infow("fail-to-release-all-reservations", log.Fields{"error": err}) | |
} | |
// Close the DB connection | |
rw.kvClient.Close() | |
} | |
// TODO: More cleanup | |
} | |
func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) { | |
log.Infow("kv-store-type", log.Fields{"store": storeType}) | |
switch storeType { | |
case "consul": | |
return kvstore.NewConsulClient(address, timeout) | |
case "etcd": | |
return kvstore.NewEtcdClient(address, timeout) | |
} | |
return nil, errors.New("unsupported-kv-store") | |
} | |
func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) { | |
log.Infow("common-client-type", log.Fields{"client": clientType}) | |
switch clientType { | |
case "sarama": | |
return kafka.NewSaramaClient( | |
kafka.Host(host), | |
kafka.Port(port), | |
kafka.ProducerReturnOnErrors(true), | |
kafka.ProducerReturnOnSuccess(true), | |
kafka.ProducerMaxRetries(6), | |
kafka.ProducerRetryBackoff(time.Millisecond*30)), nil | |
} | |
return nil, errors.New("unsupported-client-type") | |
} | |
func (a *adapter) setKVClient() error { | |
addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort) | |
client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout) | |
if err != nil { | |
a.kvClient = nil | |
log.Error(err) | |
return err | |
} | |
a.kvClient = client | |
return nil | |
} | |
func toString(value interface{}) (string, error) { | |
switch t := value.(type) { | |
case []byte: | |
return string(value.([]byte)), nil | |
case string: | |
return value.(string), nil | |
default: | |
return "", fmt.Errorf("unexpected-type-%T", t) | |
} | |
} | |
func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) { | |
log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost, | |
"port": a.config.KafkaAdapterPort, "topic": a.config.Topic}) | |
var err error | |
var kip *kafka.InterContainerProxy | |
if kip, err = kafka.NewInterContainerProxy( | |
kafka.InterContainerHost(a.config.KafkaAdapterHost), | |
kafka.InterContainerPort(a.config.KafkaAdapterPort), | |
kafka.MsgClient(a.kafkaClient), | |
kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil { | |
log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err}) | |
return nil, err | |
} | |
count := 0 | |
for { | |
if err = kip.Start(); err != nil { | |
log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err}) | |
if retries == count { | |
return nil, err | |
} | |
count = +1 | |
// Take a nap before retrying | |
time.Sleep(2 * time.Second) | |
} else { | |
break | |
} | |
} | |
log.Info("common-messaging-proxy-created") | |
return kip, nil | |
} | |
func (a *adapter) startOpenOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, onuNumber int) (*ac.OpenOLT, error) { | |
log.Info("starting-open-olt") | |
var err error | |
sOLT := ac.NewOpenOLT(ctx, a.kip, cp, onuNumber) | |
if err = sOLT.Start(ctx); err != nil { | |
log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err}) | |
return nil, err | |
} | |
log.Info("open-olt-started") | |
return sOLT, nil | |
} | |
func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error { | |
log.Info("setting-request-handler") | |
requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, a.coreProxy) | |
if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil { | |
log.Errorw("request-handler-setup-failed", log.Fields{"error": err}) | |
return err | |
} | |
log.Info("request-handler-setup-done") | |
return nil | |
} | |
func (a *adapter) registerWithCore(retries int) error { | |
log.Info("registering-with-core") | |
adapterDescription := &voltha.Adapter{Id: "openolt", Vendor: "simulation Enterprise Inc"} | |
types := []*voltha.DeviceType{{Id: "openolt", Adapter: "openolt"}} | |
deviceTypes := &voltha.DeviceTypes{Items: types} | |
count := 0 | |
for { | |
if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil { | |
log.Warnw("registering-with-core-failed", log.Fields{"error": err}) | |
if retries == count { | |
return err | |
} | |
count += 1 | |
// Take a nap before retrying | |
time.Sleep(2 * time.Second) | |
} else { | |
break | |
} | |
} | |
log.Info("registered-with-core") | |
return nil | |
} | |
func waitForExit() int { | |
signalChannel := make(chan os.Signal, 1) | |
signal.Notify(signalChannel, | |
syscall.SIGHUP, | |
syscall.SIGINT, | |
syscall.SIGTERM, | |
syscall.SIGQUIT) | |
exitChannel := make(chan int) | |
go func() { | |
s := <-signalChannel | |
switch s { | |
case syscall.SIGHUP, | |
syscall.SIGINT, | |
syscall.SIGTERM, | |
syscall.SIGQUIT: | |
log.Infow("closing-signal-received", log.Fields{"signal": s}) | |
exitChannel <- 0 | |
default: | |
log.Infow("unexpected-signal-received", log.Fields{"signal": s}) | |
exitChannel <- 1 | |
} | |
}() | |
code := <-exitChannel | |
return code | |
} | |
func printBanner() { | |
fmt.Println(" ____ ____ _ _______ ") | |
fmt.Println(" / _ \\ / __\\| | |__ __|") | |
fmt.Println(" | | | |_ __ ___ _ __ | | | | | | | ") | |
fmt.Println(" | | | | '_\\ / _\\ '_\\ | | | | | | | ") | |
fmt.Println(" | |__| | |_) | __/ | | || |__| | |____| | ") | |
fmt.Println(" \\____/| .__/\\___|_| |_|\\____/|______|_| ") | |
fmt.Println(" | | ") | |
fmt.Println(" |_| ") | |
fmt.Println(" ") | |
} | |
func main() { | |
start := time.Now() | |
cf := config.NewAdapterFlags() | |
cf.ParseCommandArguments() | |
//// Setup logging | |
//Setup default logger - applies for packages that do not have specific logger set | |
if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil { | |
log.With(log.Fields{"error": err}).Fatal("Cannot setup logging") | |
} | |
// Update all loggers (provisionned via init) with a common field | |
if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil { | |
log.With(log.Fields{"error": err}).Fatal("Cannot setup logging") | |
} | |
log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel) | |
defer log.CleanUp() | |
// Print banner if specified | |
if cf.Banner { | |
printBanner() | |
} | |
log.Infow("config", log.Fields{"config": *cf}) | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
ad := newAdapter(cf) | |
go ad.start(ctx) | |
code := waitForExit() | |
log.Infow("received-a-closing-signal", log.Fields{"code": code}) | |
// Cleanup before leaving | |
ad.stop() | |
elapsed := time.Since(start) | |
log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second}) | |
} |