[VOL-1024] This update consists of:
1) Inter-container kafka library in Go
2) initial set of proto definitions
3) Test cases for the kafka library
Change-Id: Ibe8b0f673a90bbe4cb92847ce40f31ec2d0b6244
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index a223318..8b2c165 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,7 +24,7 @@
default_KVStoreTimeout = 5 //in seconds
default_KVStoreHost = "10.100.198.240"
default_KVStorePort = 8500 // Etcd = 2379
- default_LogLevel = 1
+ default_LogLevel = 0
default_Banner = false
default_CoreTopic = "rwcore"
default_RWCoreEndpoint = "rwcore"
diff --git a/rw_core/core/requestHandlerProxy.go b/rw_core/core/requestHandlerProxy.go
new file mode 100644
index 0000000..4bfafa5
--- /dev/null
+++ b/rw_core/core/requestHandlerProxy.go
@@ -0,0 +1,154 @@
+package core
+
+import (
+ "github.com/opencord/voltha-go/common/log"
+ ca "github.com/opencord/voltha-go/protos/core_adapter"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "github.com/golang/protobuf/ptypes"
+ "errors"
+)
+
+type RequestHandlerProxy struct {
+ TestMode bool
+}
+
+func (rhp *RequestHandlerProxy) GetDevice(args []*ca.Argument)(error, *voltha.Device) {
+ if len(args) != 1 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return err, nil
+ }
+ pID := &ca.StrType{}
+ if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+ return err, nil
+ }
+ log.Debugw("GetDevice", log.Fields{"deviceId": pID.Val})
+ // TODO process the request
+
+ if rhp.TestMode { // Execute only for test cases
+ return nil, &voltha.Device{Id: pID.Val}
+ }
+ return nil, nil
+}
+
+
+func (rhp *RequestHandlerProxy) GetChildDevice(args []*ca.Argument)(error, *voltha.Device) {
+ if len(args) < 1 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return err, nil
+ }
+ pID := &ca.StrType{}
+ if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+ return err, nil
+ }
+ // TODO decompose the other parameteres for matching criteria and process
+ log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Val})
+
+ if rhp.TestMode { // Execute only for test cases
+ return nil, &voltha.Device{Id: pID.Val}
+ }
+ return nil, nil
+}
+
+func (rhp *RequestHandlerProxy) GetPorts(args []*ca.Argument)(error, *voltha.Ports) {
+ if len(args) != 2 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return err, nil
+ }
+ pID := &ca.StrType{}
+ if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+ return err, nil
+ }
+ // Porttype is an enum sent as an integer proto
+ pt := &ca.IntType{}
+ if err := ptypes.UnmarshalAny(args[1].Value, pt); err != nil {
+ log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+ return err, nil
+ }
+
+ // TODO decompose the other parameteres for matching criteria
+ log.Debugw("GetPorts", log.Fields{"deviceID": pID.Val, "portype": pt.Val})
+
+ if rhp.TestMode { // Execute only for test cases
+ aPort := &voltha.Port{Label:"test_port"}
+ allPorts := &voltha.Ports{}
+ allPorts.Items = append(allPorts.Items, aPort)
+ return nil, allPorts
+ }
+ return nil, nil
+
+}
+
+func (rhp *RequestHandlerProxy) GetChildDevices(args []*ca.Argument)(error, *voltha.Device) {
+ if len(args) != 1 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return err, nil
+ }
+ pID := &ca.StrType{}
+ if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+ return err, nil
+ }
+ // TODO decompose the other parameteres for matching criteria and process
+ log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Val})
+
+ if rhp.TestMode { // Execute only for test cases
+ return nil, &voltha.Device{Id: pID.Val}
+ }
+ return nil, nil
+}
+
+// ChildDeviceDetected is invoked when a child device is detected. The following
+// parameters are expected:
+// {parent_device_id, parent_port_no, child_device_type, proxy_address, admin_state, **kw)
+func (rhp *RequestHandlerProxy) ChildDeviceDetected(args []*ca.Argument) (error) {
+ if len(args) < 5 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return err
+ }
+
+ pID := &ca.StrType{}
+ if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
+ log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+ return err
+ }
+ portNo := &ca.IntType{}
+ if err := ptypes.UnmarshalAny(args[1].Value, portNo); err != nil {
+ log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+ return err
+ }
+ dt := &ca.StrType{}
+ if err := ptypes.UnmarshalAny(args[2].Value, dt); err != nil {
+ log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+ return err
+ }
+ pAddr := &voltha.Device_ProxyAddress{}
+ if err := ptypes.UnmarshalAny(args[3].Value, pAddr); err != nil {
+ log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+ return err
+ }
+ adminState := &ca.IntType{}
+ if err := ptypes.UnmarshalAny(args[4].Value, adminState); err != nil {
+ log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+ return err
+ }
+
+ // Need to decode the other params - in this case the key will represent the proto type
+ // TODO decompose the other parameteres for matching criteria and process
+ log.Debugw("ChildDeviceDetected", log.Fields{"deviceId": pID.Val, "portNo":portNo.Val,
+ "deviceType": dt.Val, "proxyAddress": pAddr, "adminState": adminState})
+
+ if rhp.TestMode { // Execute only for test cases
+ return nil
+ }
+ return nil
+}
+
+
diff --git a/rw_core/main.go b/rw_core/main.go
index faa5851..a0928c5 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -6,6 +6,8 @@
"fmt"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/kafka"
+ ca "github.com/opencord/voltha-go/protos/core_adapter"
"github.com/opencord/voltha-go/rw_core/config"
"os"
"os/signal"
@@ -19,6 +21,9 @@
config *config.RWCoreFlags
halted bool
exitChannel chan int
+ kmp *kafka.KafkaMessagingProxy
+ //For test
+ receiverChannels []<-chan *ca.InterContainerMessage
}
func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
@@ -38,6 +43,7 @@
rwCore.config = cf
rwCore.halted = false
rwCore.exitChannel = make(chan int, 1)
+ rwCore.receiverChannels = make([]<-chan *ca.InterContainerMessage, 0)
return &rwCore
}
@@ -63,6 +69,7 @@
}
}
+
func (core *rwCore) start(ctx context.Context) {
log.Info("Starting RW Core components")
// Setup GRPC Server
@@ -70,13 +77,29 @@
// Setup KV Client
// Setup Kafka messaging services
-
+ var err error
+ if core.kmp, err = kafka.NewKafkaMessagingProxy(
+ kafka.KafkaHost("10.100.198.220"),
+ kafka.KafkaPort(9092),
+ kafka.DefaultTopic(&kafka.Topic{Name: "Adapter"})); err != nil {
+ log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
+ return
+ }
+ // Start the kafka messaging service - synchronous call to ensure
+ if err = core.kmp.Start(); err != nil {
+ log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
+ }
}
func (core *rwCore) stop() {
// Stop leadership tracking
core.halted = true
+ // Stop the Kafka messaging service
+ if core.kmp != nil {
+ core.kmp.Stop()
+ }
+
// send exit signal
core.exitChannel <- 0