[VOL-1359] This commit consists of the creation of the simulated
OLT and ONU adapters (in Go language). This update also provides
the set of files to build and run these containers.
Change-Id: Id7b0c77fdf60cb02c39908d4374d3e93fab5de67
diff --git a/BUILD.md b/BUILD.md
index d96a573..10aa694 100644
--- a/BUILD.md
+++ b/BUILD.md
@@ -54,7 +54,7 @@
protos/scripts/build_protos.sh protos
```
-### Building and running Voltha
+### Building Voltha Core
A fatal error occurs if Voltha is built and executed at this stage:
```
> go run rw_core/main.go
@@ -72,3 +72,44 @@
```
make rw_core
```
+
+### Building and running Ponsim OLT and ONU Adapters
+Please refer to the README.md file under the ```python``` directory
+
+
+### Building Simulated OLT and ONU Adapters
+Simulated OLT, ONU and rw_core can be build together:
+```
+make build
+```
+or they via be individually built:
+```
+make rw_core
+make simulated_olt
+make simulated_onu
+```
+
+### Running rw_core, Simulated OLT and ONU Adapters
+In the example below we are using the docker-compose command to run these containers locally.
+```
+DOCKER_HOST_IP=<Host IP> docker-compose -f compose/docker-compose-zk-kafka-test.yml up -d
+DOCKER_HOST_IP=<Host IP> docker-compose -f compose/docker-compose-etcd.yml up -d
+DOCKER_HOST_IP=<Host IP> docker-compose -f compose/rw_core.yml up -d
+DOCKER_HOST_IP=<Host IP> docker-compose -f compose/compose/adapters-simulated.yml up -d
+```
+
+You should see the following containers up and running
+
+```$xslt
+CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
+338fd67c2029 voltha-adapter-simulated-onu "/app/simulated_onu …" 37 seconds ago Up 36 seconds compose_adapter_simulated_onu_1_a39b1a9d27d5
+15b159bab626 voltha-adapter-simulated-olt "/app/simulated_olt …" 37 seconds ago Up 36 seconds compose_adapter_simulated_olt_1_b5407c23b483
+401128a1755f voltha-rw-core "/app/rw_core -kv_st…" About a minute ago Up About a minute 0.0.0.0:50057->50057/tcp compose_rw_core_1_36cd5e255edf
+ba4eb9384f5b quay.io/coreos/etcd:v3.2.9 "etcd --name=etcd0 -…" About a minute ago Up About a minute 0.0.0.0:2379->2379/tcp, 0.0.0.0:32775->2380/tcp, 0.0.0.0:32774->4001/tcp compose_etcd_1_368cd0bc1421
+55f74277a530 wurstmeister/kafka:2.11-2.0.1 "start-kafka.sh" 2 minutes ago Up 2 minutes 0.0.0.0:9092->9092/tcp compose_kafka_1_a8631e438fe2
+fb60076d8b3e wurstmeister/zookeeper:latest "/bin/sh -c '/usr/sb…" 2 minutes ago Up 2 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp compose_zookeeper_1_7ff68af103cf
+```
+
+
+
+
diff --git a/Makefile b/Makefile
index b988f21..ae31832 100644
--- a/Makefile
+++ b/Makefile
@@ -54,7 +54,7 @@
rw_core
-.PHONY: $(DIRS) $(DIRS_CLEAN) $(DIRS_FLAKE8) rw_core ro_core protos kafka db tests adapters
+.PHONY: $(DIRS) $(DIRS_CLEAN) $(DIRS_FLAKE8) rw_core protos kafka db tests python simulators k8s
# This should to be the first and default target in this Makefile
help:
@@ -80,7 +80,7 @@
build: containers
-containers: rw_core
+containers: rw_core simulated_olt simulated_onu
ifneq ($(VOLTHA_BUILD),docker)
rw_core:
@@ -90,4 +90,20 @@
docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-rw-core:${TAG} -f docker/Dockerfile.rw_core_d .
endif
+ifneq ($(VOLTHA_BUILD),docker)
+simulated_olt:
+ docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-adapter-simulated-olt:${TAG} -f docker/Dockerfile.simulated_olt .
+else
+simulated_olt:
+ docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-adapter-simulated-olt:${TAG} -f docker/Dockerfile.simulated_olt_d .
+endif
+
+ifneq ($(VOLTHA_BUILD),docker)
+simulated_onu:
+ docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-adapter-simulated-onu:${TAG} -f docker/Dockerfile.simulated_onu .
+else
+simulated_onu:
+ docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-adapter-simulated-onu:${TAG} -f docker/Dockerfile.simulated_onu_d .
+endif
+
# end file
diff --git a/adapters/README.md b/adapters/README.md
new file mode 100644
index 0000000..13479f8
--- /dev/null
+++ b/adapters/README.md
@@ -0,0 +1,10 @@
+## How to Build and Run a Voltha Go language Adapter
+
+This directory is a repo for all voltha adapters written in Go language. At this time, the simulated_olt and
+simulated_onu adapters are the only adapters using the Go language. These adapters provide basic capabilities
+which will be used for high availability and capacity testing.
+
+### Building and running the Simulated OLT and ONU Adapters
+
+Please refer to the ```BUILD.md``` file under the voltha-go repo
+
diff --git a/adapters/common/adapter_proxy.go b/adapters/common/adapter_proxy.go
new file mode 100644
index 0000000..4e63442
--- /dev/null
+++ b/adapters/common/adapter_proxy.go
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "context"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/kafka"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "time"
+)
+
+type AdapterProxy struct {
+ kafkaICProxy *kafka.InterContainerProxy
+ adapterTopic string
+ coreTopic string
+}
+
+func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
+ var proxy AdapterProxy
+ proxy.kafkaICProxy = kafkaProxy
+ proxy.adapterTopic = adapterTopic
+ proxy.coreTopic = coreTopic
+ log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+ return &proxy
+}
+
+func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
+ msg proto.Message,
+ msgType ic.InterAdapterMessageType_Types,
+ fromAdapter string,
+ toAdapter string,
+ toDeviceId string,
+ proxyDeviceId string,
+ messageId string) error {
+ log.Debugw("sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
+ "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
+
+ //Marshal the message
+ var marshalledMsg *any.Any
+ var err error
+ if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
+ log.Warnw("cannot-marshal-msg", log.Fields{"error": err})
+ return err
+ }
+
+ //Build the inter adapter message
+ header := &ic.InterAdapterHeader{
+ Type: msgType,
+ FromTopic: fromAdapter,
+ ToTopic: toAdapter,
+ ToDeviceId: toDeviceId,
+ ProxyDeviceId: proxyDeviceId,
+ }
+ if messageId != "" {
+ header.Id = messageId
+ } else {
+ header.Id = uuid.New().String()
+ }
+ header.Timestamp = time.Now().Unix()
+ iaMsg := &ic.InterAdapterMessage{
+ Header: header,
+ Body: marshalledMsg,
+ }
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "msg",
+ Value: iaMsg,
+ }
+
+ // Set up the required rpc arguments
+ topic := kafka.Topic{Name: fromAdapter}
+ replyToTopic := kafka.Topic{Name: toAdapter}
+ rpc := "Process_inter_adapter_message"
+
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+ log.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ return unPackResponse(rpc, "", success, result)
+}
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
new file mode 100644
index 0000000..f076910
--- /dev/null
+++ b/adapters/common/core_proxy.go
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "context"
+ "github.com/golang/protobuf/ptypes"
+ a "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/kafka"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type CoreProxy struct {
+ kafkaICProxy *kafka.InterContainerProxy
+ adapterTopic string
+ coreTopic string
+}
+
+func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
+ var proxy CoreProxy
+ proxy.kafkaICProxy = kafkaProxy
+ proxy.adapterTopic = adapterTopic
+ proxy.coreTopic = coreTopic
+ log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+
+ return &proxy
+}
+
+func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
+ if success {
+ return nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
+ log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
+ rpc := "Register"
+ topic := kafka.Topic{Name: ap.coreTopic}
+ replyToTopic := kafka.Topic{Name: ap.adapterTopic}
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "adapter",
+ Value: adapter,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "deviceTypes",
+ Value: deviceTypes,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+ log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ return unPackResponse(rpc, "", success, result)
+}
+
+func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
+ log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
+ rpc := "DeviceUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := kafka.CreateSubTopic(ap.coreTopic, device.Id)
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, device.Id)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
+ log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
+ rpc := "PortCreated"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "port",
+ Value: port,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
+ connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
+ log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
+ rpc := "DeviceStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+ args := make([]*kafka.KVArg, 3)
+ id := &voltha.ID{Id: deviceId}
+ oStatus := &ic.IntType{Val: int64(operStatus)}
+ cStatus := &ic.IntType{Val: int64(connStatus)}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "oper_status",
+ Value: oStatus,
+ }
+ args[2] = &kafka.KVArg{
+ Key: "connect_status",
+ Value: cStatus,
+ }
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
+ childDeviceType string, channelId int) error {
+ log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
+ rpc := "ChildDeviceDetected"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := kafka.CreateSubTopic(ap.coreTopic, parentDeviceId)
+ replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, parentDeviceId)
+
+ args := make([]*kafka.KVArg, 4)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "parent_device_id",
+ Value: id,
+ }
+ ppn := &ic.IntType{Val: int64(parentPortNo)}
+ args[1] = &kafka.KVArg{
+ Key: "parent_port_no",
+ Value: ppn,
+ }
+ cdt := &ic.StrType{Val: childDeviceType}
+ args[2] = &kafka.KVArg{
+ Key: "child_device_type",
+ Value: cdt,
+ }
+ channel := &ic.IntType{Val: int64(channelId)}
+ args[3] = &kafka.KVArg{
+ Key: "channel_id",
+ Value: channel,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(rpc, parentDeviceId, success, result)
+}
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
new file mode 100644
index 0000000..ec04618
--- /dev/null
+++ b/adapters/common/request_handler.go
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "errors"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-go/adapters"
+ "github.com/opencord/voltha-go/common/log"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type RequestHandlerProxy struct {
+ TestMode bool
+ coreInstanceId string
+ adapter adapters.IAdapter
+}
+
+func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter) *RequestHandlerProxy {
+ var proxy RequestHandlerProxy
+ proxy.coreInstanceId = coreInstanceId
+ proxy.adapter = iadapter
+ return &proxy
+}
+
+func (rhp *RequestHandlerProxy) Adapter_descriptor() (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Device_types() (*voltha.DeviceTypes, error) {
+ return nil, nil
+}
+
+func (rhp *RequestHandlerProxy) Health() (*voltha.HealthStatus, error) {
+ return nil, nil
+}
+
+func (rhp *RequestHandlerProxy) Adopt_device(args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) != 1 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ log.Debugw("Adopt_device", log.Fields{"deviceId": device.Id})
+
+ //Invoke the adopt device on the adapter
+ if err := rhp.adapter.Adopt_device(device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Reconcile_device(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Abandon_device(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Disable_device(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Reenable_device(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Reboot_device(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Self_test_device(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Delete_device(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Get_device_details(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Update_flows_bulk(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Update_flows_incrementally(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Update_pm_config(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Receive_packet_out(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Suppress_alarm(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Unsuppress_alarm(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Get_ofp_device_info(args []*ic.Argument) (*ic.SwitchCapability, error) {
+ if len(args) != 1 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ log.Debugw("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
+
+ var cap *ic.SwitchCapability
+ var err error
+ if cap, err = rhp.adapter.Get_ofp_device_info(device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return cap, nil
+}
+
+func (rhp *RequestHandlerProxy) Get_ofp_port_info(args []*ic.Argument) (*ic.PortCapability, error) {
+ if len(args) != 2 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ pNo := &ic.IntType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "port_no":
+ if err := ptypes.UnmarshalAny(arg.Value, pNo); err != nil {
+ log.Warnw("cannot-unmarshal-port-no", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ log.Debugw("Get_ofp_port_info", log.Fields{"deviceId": device.Id, "portNo": pNo.Val})
+ var cap *ic.PortCapability
+ var err error
+ if cap, err = rhp.adapter.Get_ofp_port_info(device, pNo.Val); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return cap, nil
+}
+
+func (rhp *RequestHandlerProxy) Process_inter_adapter_message(args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) != 1 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ iaMsg := &ic.InterAdapterMessage{}
+ if err := ptypes.UnmarshalAny(args[0].Value, iaMsg); err != nil {
+ log.Warnw("cannot-unmarshal-message", log.Fields{"error": err})
+ return nil, err
+ }
+
+ log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": iaMsg.Header.Id})
+
+ //Invoke the inter adapter API on the handler
+ if err := rhp.adapter.Process_inter_adapter_message(iaMsg); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Download_image(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Get_image_download_status(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Cancel_image_download(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Activate_image_update(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Revert_image_update(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
diff --git a/adapters/common/utils.go b/adapters/common/utils.go
new file mode 100644
index 0000000..8cade8f
--- /dev/null
+++ b/adapters/common/utils.go
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+)
+
+//GetRandomSerialNumber returns a serial number formatted as "HOST:PORT"
+func GetRandomSerialNumber() string {
+ rand.Seed(time.Now().Unix())
+ return fmt.Sprintf("%d.%d.%d.%d:%d",
+ rand.Intn(255),
+ rand.Intn(255),
+ rand.Intn(255),
+ rand.Intn(255),
+ rand.Intn(9000)+1000,
+ )
+}
+
+//GetRandomMacAddress returns a random mac address
+func GetRandomMacAddress() string {
+ rand.Seed(time.Now().Unix())
+ return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x",
+ rand.Intn(128),
+ rand.Intn(128),
+ rand.Intn(128),
+ rand.Intn(128),
+ rand.Intn(128),
+ rand.Intn(128),
+ )
+}
diff --git a/adapters/iAdapter.go b/adapters/iAdapter.go
new file mode 100644
index 0000000..f8de35e
--- /dev/null
+++ b/adapters/iAdapter.go
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adapters
+
+import (
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+)
+
+//IAdapter represents the set of APIs a voltha adapter has to support.
+type IAdapter interface {
+ Adapter_descriptor() error
+ Device_types() (*voltha.DeviceTypes, error)
+ Health() (*voltha.HealthStatus, error)
+ Adopt_device(device *voltha.Device) error
+ Reconcile_device(device *voltha.Device) error
+ Abandon_device(device *voltha.Device) error
+ Disable_device(device *voltha.Device) error
+ Reenable_device(device *voltha.Device) error
+ Reboot_device(device *voltha.Device) error
+ Self_test_device(device *voltha.Device) error
+ Gelete_device(device *voltha.Device) error
+ Get_device_details(device *voltha.Device) error
+ Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error
+ Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error
+ Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error
+ Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error
+ Suppress_alarm(filter *voltha.AlarmFilter) error
+ Unsuppress_alarm(filter *voltha.AlarmFilter) error
+ Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error)
+ Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error)
+ Process_inter_adapter_message(msg *ic.InterAdapterMessage) error
+ Download_image(device *voltha.Device, request *voltha.ImageDownload) error
+ Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) error
+ Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) error
+ Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) error
+ Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) error
+}
diff --git a/adapters/simulated_olt/adaptercore/device_handler.go b/adapters/simulated_olt/adaptercore/device_handler.go
new file mode 100644
index 0000000..0741761
--- /dev/null
+++ b/adapters/simulated_olt/adaptercore/device_handler.go
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "context"
+ "github.com/gogo/protobuf/proto"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ of "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+//DeviceHandler follows the same patterns as ponsim_olt. The only difference is that it does not
+// interact with an OLT device.
+type DeviceHandler struct {
+ deviceId string
+ deviceType string
+ device *voltha.Device
+ coreProxy *com.CoreProxy
+ simulatedOLT *SimulatedOLT
+ nniPort *voltha.Port
+ ponPort *voltha.Port
+ exitChannel chan int
+ lockDevice sync.RWMutex
+}
+
+//NewDeviceHandler creates a new device handler
+func NewDeviceHandler(cp *com.CoreProxy, device *voltha.Device, adapter *SimulatedOLT) *DeviceHandler {
+ var dh DeviceHandler
+ dh.coreProxy = cp
+ cloned := (proto.Clone(device)).(*voltha.Device)
+ dh.deviceId = cloned.Id
+ dh.deviceType = cloned.Type
+ dh.device = cloned
+ dh.simulatedOLT = adapter
+ dh.exitChannel = make(chan int, 1)
+ dh.lockDevice = sync.RWMutex{}
+ return &dh
+}
+
+// start save the device to the data model
+func (dh *DeviceHandler) start(ctx context.Context) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ log.Debugw("starting-device-agent", log.Fields{"device": dh.device})
+ // Add the initial device to the local model
+ log.Debug("device-agent-started")
+}
+
+// stop stops the device dh. Not much to do for now
+func (dh *DeviceHandler) stop(ctx context.Context) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ log.Debug("stopping-device-agent")
+ dh.exitChannel <- 1
+ log.Debug("device-agent-stopped")
+}
+
+func macAddressToUint32Array(mac string) []uint32 {
+ slist := strings.Split(mac, ":")
+ result := make([]uint32, len(slist))
+ var err error
+ var tmp int64
+ for index, val := range slist {
+ if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
+ return []uint32{1, 2, 3, 4, 5, 6}
+ }
+ result[index] = uint32(tmp)
+ }
+ return result
+}
+
+func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
+ log.Debugw("AdoptDevice", log.Fields{"deviceId": device.Id})
+
+ // Update the device info
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.Root = true
+ cloned.Vendor = "simulators"
+ cloned.Model = "go-simulators"
+ cloned.SerialNumber = com.GetRandomSerialNumber()
+ cloned.MacAddress = strings.ToUpper(com.GetRandomMacAddress())
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.DeviceUpdate(nil, cloned); err != nil {
+ log.Errorw("error-updating-device", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ // Now create the NNI Port
+ dh.nniPort = &voltha.Port{
+ PortNo: 2,
+ Label: "NNI facing Ethernet port",
+ Type: voltha.Port_ETHERNET_NNI,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ }
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.PortCreated(nil, cloned.Id, dh.nniPort); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ // Now create the PON Port
+ dh.ponPort = &voltha.Port{
+ PortNo: 1,
+ Label: "PON port",
+ Type: voltha.Port_PON_OLT,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ }
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.PortCreated(nil, cloned.Id, dh.ponPort); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
+ cloned.OperStatus = voltha.OperStatus_ACTIVE
+
+ // Update the device state
+ if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ // Register Child device
+ initialUniPortNo := 100
+ log.Debugw("registering-onus", log.Fields{"total": dh.simulatedOLT.numOnus})
+ for i := 0; i < dh.simulatedOLT.numOnus; i++ {
+ go dh.coreProxy.ChildDeviceDetected(
+ nil,
+ cloned.Id,
+ 1,
+ "simulated_onu",
+ initialUniPortNo+i)
+ }
+ dh.device = cloned
+}
+
+func (dh *DeviceHandler) GetOfpDeviceInfo(device *voltha.Device) (*ic.SwitchCapability, error) {
+ return &ic.SwitchCapability{
+ Desc: &of.OfpDesc{
+ HwDesc: "simulated_pon",
+ SwDesc: "simulated_pon",
+ SerialNum: dh.device.SerialNumber,
+ },
+ SwitchFeatures: &of.OfpSwitchFeatures{
+ NBuffers: 256,
+ NTables: 2,
+ Capabilities: uint32(of.OfpCapabilities_OFPC_FLOW_STATS |
+ of.OfpCapabilities_OFPC_TABLE_STATS |
+ of.OfpCapabilities_OFPC_PORT_STATS |
+ of.OfpCapabilities_OFPC_GROUP_STATS),
+ },
+ }, nil
+}
+
+func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device, portNo int64) (*ic.PortCapability, error) {
+ cap := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
+ return &ic.PortCapability{
+ Port: &voltha.LogicalPort{
+ OfpPort: &of.OfpPort{
+ HwAddr: macAddressToUint32Array(dh.device.MacAddress),
+ Config: 0,
+ State: uint32(of.OfpPortState_OFPPS_LIVE),
+ Curr: cap,
+ Advertised: cap,
+ Peer: cap,
+ CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ },
+ DeviceId: dh.device.Id,
+ DevicePortNo: uint32(portNo),
+ },
+ }, nil
+}
+
+func (dh *DeviceHandler) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+ log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+ return nil
+}
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
new file mode 100644
index 0000000..bb6883a
--- /dev/null
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/kafka"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "sync"
+)
+
+type SimulatedOLT struct {
+ deviceHandlers map[string]*DeviceHandler
+ coreProxy *com.CoreProxy
+ kafkaICProxy *kafka.InterContainerProxy
+ numOnus int
+ exitChannel chan int
+ lockDeviceHandlersMap sync.RWMutex
+}
+
+func NewSimulatedOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy, onuNumber int) *SimulatedOLT {
+ var simulatedOLT SimulatedOLT
+ simulatedOLT.exitChannel = make(chan int, 1)
+ simulatedOLT.deviceHandlers = make(map[string]*DeviceHandler)
+ simulatedOLT.kafkaICProxy = kafkaICProxy
+ simulatedOLT.numOnus = onuNumber
+ simulatedOLT.coreProxy = coreProxy
+ simulatedOLT.lockDeviceHandlersMap = sync.RWMutex{}
+ return &simulatedOLT
+}
+
+func (so *SimulatedOLT) Start(ctx context.Context) error {
+ log.Info("starting-device-manager")
+ log.Info("device-manager-started")
+ return nil
+}
+
+func (so *SimulatedOLT) Stop(ctx context.Context) error {
+ log.Info("stopping-device-manager")
+ so.exitChannel <- 1
+ log.Info("device-manager-stopped")
+ return nil
+}
+
+func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
+ if ctx.Err() == nil {
+ // Returned response only of the ctx has not been cancelled/timeout/etc
+ // Channel is automatically closed when a context is Done
+ ch <- result
+ log.Debugw("sendResponse", log.Fields{"result": result})
+ } else {
+ // Should the transaction be reverted back?
+ log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
+ }
+}
+
+func (so *SimulatedOLT) addDeviceHandlerToMap(agent *DeviceHandler) {
+ so.lockDeviceHandlersMap.Lock()
+ defer so.lockDeviceHandlersMap.Unlock()
+ if _, exist := so.deviceHandlers[agent.deviceId]; !exist {
+ so.deviceHandlers[agent.deviceId] = agent
+ }
+}
+
+func (so *SimulatedOLT) deleteDeviceHandlerToMap(agent *DeviceHandler) {
+ so.lockDeviceHandlersMap.Lock()
+ defer so.lockDeviceHandlersMap.Unlock()
+ delete(so.deviceHandlers, agent.deviceId)
+}
+
+func (so *SimulatedOLT) getDeviceHandler(deviceId string) *DeviceHandler {
+ so.lockDeviceHandlersMap.Lock()
+ defer so.lockDeviceHandlersMap.Unlock()
+ if agent, ok := so.deviceHandlers[deviceId]; ok {
+ return agent
+ }
+ return nil
+}
+
+func (so *SimulatedOLT) createDeviceTopic(device *voltha.Device) error {
+ log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
+ deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
+ if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic); err != nil {
+ log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return err
+ }
+ return nil
+}
+
+func (so *SimulatedOLT) Adopt_device(device *voltha.Device) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Infow("adopt-device", log.Fields{"deviceId": device.Id})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler == nil {
+ handler := NewDeviceHandler(so.coreProxy, device, so)
+ so.addDeviceHandlerToMap(handler)
+ go handler.AdoptDevice(device)
+ // Launch the creation of the device topic
+ go so.createDeviceTopic(device)
+ }
+ return nil
+}
+
+func (so *SimulatedOLT) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {
+ log.Infow("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
+ if handler := so.getDeviceHandler(device.Id); handler != nil {
+ return handler.GetOfpDeviceInfo(device)
+ }
+ log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+ return nil, errors.New("device-handler-not-set")
+}
+
+func (so *SimulatedOLT) Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error) {
+ log.Infow("Get_ofp_port_info", log.Fields{"deviceId": device.Id})
+ if handler := so.getDeviceHandler(device.Id); handler != nil {
+ return handler.GetOfpPortInfo(device, port_no)
+ }
+ log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+ return nil, errors.New("device-handler-not-set")
+}
+
+func (so *SimulatedOLT) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+ log.Infow("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+ targetDevice := msg.Header.ProxyDeviceId // Request?
+ if targetDevice == "" && msg.Header.ToDeviceId != "" {
+ // Typical response
+ targetDevice = msg.Header.ToDeviceId
+ }
+ if handler := so.getDeviceHandler(targetDevice); handler != nil {
+ return handler.Process_inter_adapter_message(msg)
+ }
+ return errors.New(fmt.Sprintf("handler-not-found-%s", targetDevice))
+}
+
+func (so *SimulatedOLT) Adapter_descriptor() error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Device_types() (*voltha.DeviceTypes, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Health() (*voltha.HealthStatus, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Reconcile_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Abandon_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Disable_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Reenable_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Reboot_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Self_test_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Gelete_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Get_device_details(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Suppress_alarm(filter *voltha.AlarmFilter) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Unsuppress_alarm(filter *voltha.AlarmFilter) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Download_image(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
diff --git a/adapters/simulated_olt/config/config.go b/adapters/simulated_olt/config/config.go
new file mode 100644
index 0000000..5cf0dc0
--- /dev/null
+++ b/adapters/simulated_olt/config/config.go
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package config
+
+import (
+ "flag"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "os"
+)
+
+// Simulated OLT default constants
+const (
+ EtcdStoreName = "etcd"
+ default_InstanceID = "simulatedOlt001"
+ default_KafkaAdapterHost = "127.0.0.1"
+ default_KafkaAdapterPort = 9092
+ default_KafkaClusterHost = "127.0.0.1"
+ default_KafkaClusterPort = 9094
+ default_KVStoreType = EtcdStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "127.0.0.1"
+ default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ default_LogLevel = 0
+ default_Banner = false
+ default_Topic = "simulated_olt"
+ default_CoreTopic = "rwcore"
+ default_OnuNumber = 1
+)
+
+// AdapterFlags represents the set of configurations used by the read-write adaptercore service
+type AdapterFlags struct {
+ // Command line parameters
+ InstanceID string
+ KafkaAdapterHost string
+ KafkaAdapterPort int
+ KafkaClusterHost string
+ KafkaClusterPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ Topic string
+ CoreTopic string
+ LogLevel int
+ OnuNumber int
+ Banner bool
+}
+
+func init() {
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+// NewRWCoreFlags returns a new RWCore config
+func NewAdapterFlags() *AdapterFlags {
+ var adapterFlags = AdapterFlags{ // Default values
+ InstanceID: default_InstanceID,
+ KafkaAdapterHost: default_KafkaAdapterHost,
+ KafkaAdapterPort: default_KafkaAdapterPort,
+ KafkaClusterHost: default_KafkaClusterHost,
+ KafkaClusterPort: default_KafkaClusterPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ Topic: default_Topic,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ OnuNumber: default_OnuNumber,
+ Banner: default_Banner,
+ }
+ return &adapterFlags
+}
+
+// ParseCommandArguments parses the arguments when running read-write adaptercore service
+func (so *AdapterFlags) ParseCommandArguments() {
+
+ var help string
+
+ help = fmt.Sprintf("Kafka - Adapter messaging host")
+ flag.StringVar(&(so.KafkaAdapterHost), "kafka_adapter_host", default_KafkaAdapterHost, help)
+
+ help = fmt.Sprintf("Kafka - Adapter messaging port")
+ flag.IntVar(&(so.KafkaAdapterPort), "kafka_adapter_port", default_KafkaAdapterPort, help)
+
+ help = fmt.Sprintf("Kafka - Cluster messaging host")
+ flag.StringVar(&(so.KafkaClusterHost), "kafka_cluster_host", default_KafkaClusterHost, help)
+
+ help = fmt.Sprintf("Kafka - Cluster messaging port")
+ flag.IntVar(&(so.KafkaClusterPort), "kafka_cluster_port", default_KafkaClusterPort, help)
+
+ help = fmt.Sprintf("Simulated OLT topic")
+ flag.StringVar(&(so.Topic), "simulator_topic", default_Topic, help)
+
+ help = fmt.Sprintf("Core topic")
+ flag.StringVar(&(so.CoreTopic), "core_topic", default_CoreTopic, help)
+
+ help = fmt.Sprintf("KV store type")
+ flag.StringVar(&(so.KVStoreType), "kv_store_type", default_KVStoreType, help)
+
+ help = fmt.Sprintf("The default timeout when making a kv store request")
+ flag.IntVar(&(so.KVStoreTimeout), "kv_store_request_timeout", default_KVStoreTimeout, help)
+
+ help = fmt.Sprintf("KV store host")
+ flag.StringVar(&(so.KVStoreHost), "kv_store_host", default_KVStoreHost, help)
+
+ help = fmt.Sprintf("KV store port")
+ flag.IntVar(&(so.KVStorePort), "kv_store_port", default_KVStorePort, help)
+
+ help = fmt.Sprintf("Log level")
+ flag.IntVar(&(so.LogLevel), "log_level", default_LogLevel, help)
+
+ help = fmt.Sprintf("Number of ONUs")
+ flag.IntVar(&(so.OnuNumber), "onu_number", default_OnuNumber, help)
+
+ help = fmt.Sprintf("Show startup banner log lines")
+ flag.BoolVar(&so.Banner, "banner", default_Banner, help)
+
+ flag.Parse()
+
+ containerName := getContainerInfo()
+ if len(containerName) > 0 {
+ so.InstanceID = containerName
+ }
+
+}
+
+func getContainerInfo() string {
+ return os.Getenv("HOSTNAME")
+}
diff --git a/adapters/simulated_olt/main.go b/adapters/simulated_olt/main.go
new file mode 100644
index 0000000..01234a9
--- /dev/null
+++ b/adapters/simulated_olt/main.go
@@ -0,0 +1,337 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/adapters"
+ com "github.com/opencord/voltha-go/adapters/common"
+ ac "github.com/opencord/voltha-go/adapters/simulated_olt/adaptercore"
+ "github.com/opencord/voltha-go/adapters/simulated_olt/config"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/kafka"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "os"
+ "os/signal"
+ "strconv"
+ "syscall"
+ "time"
+)
+
+type adapter struct {
+ instanceId string
+ config *config.AdapterFlags
+ iAdapter adapters.IAdapter
+ kafkaClient kafka.Client
+ kvClient kvstore.Client
+ kip *kafka.InterContainerProxy
+ coreProxy *com.CoreProxy
+ halted bool
+ exitChannel chan int
+ receiverChannels []<-chan *ic.InterContainerMessage
+}
+
+func init() {
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+func newAdapter(cf *config.AdapterFlags) *adapter {
+ var a adapter
+ a.instanceId = cf.InstanceID
+ a.config = cf
+ a.halted = false
+ a.exitChannel = make(chan int, 1)
+ a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
+ return &a
+}
+
+func (a *adapter) start(ctx context.Context) {
+ log.Info("Starting Core Adapter components")
+ var err error
+
+ // Setup KV Client
+ log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
+ if err := a.setKVClient(); err != nil {
+ log.Fatal("error-setting-kv-client")
+ }
+
+ // Setup Kafka Client
+ if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
+ log.Fatal("Unsupported-common-client")
+ }
+
+ // Start the common InterContainer Proxy - retries indefinitely
+ if a.kip, err = a.startInterContainerProxy(-1); err != nil {
+ log.Fatal("error-starting-inter-container-proxy")
+ }
+
+ // Create the core proxy to handle requests to the Core
+ a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
+
+ // Create the simulated OLT adapter
+ if a.iAdapter, err = a.startSimulatedOLT(ctx, a.kip, a.coreProxy, a.config.OnuNumber); err != nil {
+ log.Fatal("error-starting-inter-container-proxy")
+ }
+
+ // Register the core request handler
+ if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+ log.Fatal("error-setting-core-request-handler")
+ }
+
+ // Register this adapter to the Core - retries indefinitely
+ if err = a.registerWithCore(-1); err != nil {
+ log.Fatal("error-registering-with-core")
+ }
+}
+
+func (rw *adapter) stop() {
+ // Stop leadership tracking
+ rw.halted = true
+
+ // send exit signal
+ rw.exitChannel <- 0
+
+ // Cleanup - applies only if we had a kvClient
+ if rw.kvClient != nil {
+ // Release all reservations
+ if err := rw.kvClient.ReleaseAllReservations(); err != nil {
+ log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
+ }
+ // Close the DB connection
+ rw.kvClient.Close()
+ }
+
+ // TODO: More cleanup
+}
+
+func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
+
+ log.Infow("kv-store-type", log.Fields{"store": storeType})
+ switch storeType {
+ case "consul":
+ return kvstore.NewConsulClient(address, timeout)
+ case "etcd":
+ return kvstore.NewEtcdClient(address, timeout)
+ }
+ return nil, errors.New("unsupported-kv-store")
+}
+
+func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+
+ log.Infow("common-client-type", log.Fields{"client": clientType})
+ switch clientType {
+ case "sarama":
+ return kafka.NewSaramaClient(
+ kafka.Host(host),
+ kafka.Port(port),
+ kafka.ProducerReturnOnErrors(true),
+ kafka.ProducerReturnOnSuccess(true),
+ kafka.ProducerMaxRetries(6),
+ kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+ }
+ return nil, errors.New("unsupported-client-type")
+}
+
+func (a *adapter) setKVClient() error {
+ addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
+ client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
+ if err != nil {
+ a.kvClient = nil
+ log.Error(err)
+ return err
+ }
+ a.kvClient = client
+ return nil
+}
+
+func toString(value interface{}) (string, error) {
+ switch t := value.(type) {
+ case []byte:
+ return string(value.([]byte)), nil
+ case string:
+ return value.(string), nil
+ default:
+ return "", fmt.Errorf("unexpected-type-%T", t)
+ }
+}
+
+func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
+ log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
+ "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
+ var err error
+ var kip *kafka.InterContainerProxy
+ if kip, err = kafka.NewInterContainerProxy(
+ kafka.InterContainerHost(a.config.KafkaAdapterHost),
+ kafka.InterContainerPort(a.config.KafkaAdapterPort),
+ kafka.MsgClient(a.kafkaClient),
+ kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
+ log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ count := 0
+ for {
+ if err = kip.Start(); err != nil {
+ log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
+ if retries == count {
+ return nil, err
+ }
+ count = +1
+ // Take a nap before retrying
+ time.Sleep(2 * time.Second)
+ } else {
+ break
+ }
+ }
+
+ log.Info("common-messaging-proxy-created")
+ return kip, nil
+}
+
+func (a *adapter) startSimulatedOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, onuNumber int) (*ac.SimulatedOLT, error) {
+ log.Info("starting-simulated-olt")
+ var err error
+ sOLT := ac.NewSimulatedOLT(ctx, a.kip, cp, onuNumber)
+
+ if err = sOLT.Start(ctx); err != nil {
+ log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+
+ log.Info("simulated-olt-started")
+ return sOLT, nil
+}
+
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+ log.Info("setting-request-handler")
+ requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+ if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
+ log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
+ return err
+
+ }
+ log.Info("request-handler-setup-done")
+ return nil
+}
+
+func (a *adapter) registerWithCore(retries int) error {
+ log.Info("registering-with-core")
+ adapterDescription := &voltha.Adapter{Id: "simulated_olt", Vendor: "simulation Enterprise Inc"}
+ types := []*voltha.DeviceType{{Id: "simulated_olt", Adapter: "simulated_olt"}}
+ deviceTypes := &voltha.DeviceTypes{Items: types}
+ count := 0
+ for {
+ if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {
+ log.Warnw("registering-with-core-failed", log.Fields{"error": err})
+ if retries == count {
+ return err
+ }
+ count += 1
+ // Take a nap before retrying
+ time.Sleep(2 * time.Second)
+ } else {
+ break
+ }
+ }
+ log.Info("registered-with-core")
+ return nil
+}
+
+func waitForExit() int {
+ signalChannel := make(chan os.Signal, 1)
+ signal.Notify(signalChannel,
+ syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT)
+
+ exitChannel := make(chan int)
+
+ go func() {
+ s := <-signalChannel
+ switch s {
+ case syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT:
+ log.Infow("closing-signal-received", log.Fields{"signal": s})
+ exitChannel <- 0
+ default:
+ log.Infow("unexpected-signal-received", log.Fields{"signal": s})
+ exitChannel <- 1
+ }
+ }()
+
+ code := <-exitChannel
+ return code
+}
+
+func printBanner() {
+ fmt.Println(" ____ _ _ _ _ ___ _ _____ ")
+ fmt.Println("/ ___|(_)_ __ ___ _ _| | __ _| |_ ___ __| |/ _ \\| | |_ _| ")
+ fmt.Println("\\___ \\| | '_ ` _ \\| | | | |/ _` | __/ _ \\/ _` | | | | | | | ")
+ fmt.Println(" ___) | | | | | | | |_| | | (_| | || __/ (_| | |_| | |___| | ")
+ fmt.Println("|____/|_|_| |_| |_|\\__,_|_|\\__,_|\\__\\___|\\__,_|\\___/|_____|_| ")
+ fmt.Println(" ")
+}
+
+func main() {
+ start := time.Now()
+
+ cf := config.NewAdapterFlags()
+ cf.ParseCommandArguments()
+
+ //// Setup logging
+
+ //Setup default logger - applies for packages that do not have specific logger set
+ if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ // Update all loggers (provisionned via init) with a common field
+ if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel)
+
+ defer log.CleanUp()
+
+ // Print banner if specified
+ if cf.Banner {
+ printBanner()
+ }
+
+ log.Infow("config", log.Fields{"config": *cf})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ ad := newAdapter(cf)
+ go ad.start(ctx)
+
+ code := waitForExit()
+ log.Infow("received-a-closing-signal", log.Fields{"code": code})
+
+ // Cleanup before leaving
+ ad.stop()
+
+ elapsed := time.Since(start)
+ log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
+}
diff --git a/adapters/simulated_onu/adaptercore/device_handler.go b/adapters/simulated_onu/adaptercore/device_handler.go
new file mode 100644
index 0000000..51a0667
--- /dev/null
+++ b/adapters/simulated_onu/adaptercore/device_handler.go
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "context"
+ "github.com/gogo/protobuf/proto"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ of "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+//DeviceHandler follows the same patterns as ponsim_olt. The only difference is that it does not
+// interact with an OLT device.
+type DeviceHandler struct {
+ deviceId string
+ deviceType string
+ device *voltha.Device
+ coreProxy *com.CoreProxy
+ simulatedOLT *SimulatedONU
+ uniPort *voltha.Port
+ ponPort *voltha.Port
+ exitChannel chan int
+ lockDevice sync.RWMutex
+}
+
+//NewDeviceHandler creates a new device handler
+func NewDeviceHandler(cp *com.CoreProxy, device *voltha.Device, adapter *SimulatedONU) *DeviceHandler {
+ var dh DeviceHandler
+ dh.coreProxy = cp
+ cloned := (proto.Clone(device)).(*voltha.Device)
+ dh.deviceId = cloned.Id
+ dh.deviceType = cloned.Type
+ dh.device = cloned
+ dh.simulatedOLT = adapter
+ dh.exitChannel = make(chan int, 1)
+ dh.lockDevice = sync.RWMutex{}
+ return &dh
+}
+
+// start save the device to the data model
+func (dh *DeviceHandler) start(ctx context.Context) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ log.Debugw("starting-device-agent", log.Fields{"device": dh.device})
+ // Add the initial device to the local model
+ log.Debug("device-agent-started")
+}
+
+// stop stops the device dh. Not much to do for now
+func (dh *DeviceHandler) stop(ctx context.Context) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ log.Debug("stopping-device-agent")
+ dh.exitChannel <- 1
+ log.Debug("device-agent-stopped")
+}
+
+func macAddressToUint32Array(mac string) []uint32 {
+ slist := strings.Split(mac, ":")
+ result := make([]uint32, len(slist))
+ var err error
+ var tmp int64
+ for index, val := range slist {
+ if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
+ return []uint32{1, 2, 3, 4, 5, 6}
+ }
+ result[index] = uint32(tmp)
+ }
+ return result
+}
+
+func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
+ log.Debugw("AdoptDevice", log.Fields{"deviceId": device.Id})
+
+ // Update the device info
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.Root = false
+ cloned.Vendor = "simulators"
+ cloned.Model = "go-simulators"
+ cloned.SerialNumber = com.GetRandomSerialNumber()
+ cloned.MacAddress = strings.ToUpper(com.GetRandomMacAddress())
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.DeviceUpdate(nil, cloned); err != nil {
+ log.Errorw("error-updating-device", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ // Now create the NNI Port
+ dh.uniPort = &voltha.Port{
+ PortNo: 2,
+ Label: "UNI facing Ethernet port",
+ Type: voltha.Port_ETHERNET_UNI,
+ AdminState: voltha.AdminState_ENABLED,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ }
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.PortCreated(nil, cloned.Id, dh.uniPort); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ // Now create the PON Port
+ dh.ponPort = &voltha.Port{
+ PortNo: 1,
+ Label: "PON port",
+ Type: voltha.Port_PON_ONU,
+ AdminState: voltha.AdminState_ENABLED,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ Peers: []*voltha.Port_PeerPort{{DeviceId: cloned.ParentId,
+ PortNo: cloned.ParentPortNo}},
+ }
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.PortCreated(nil, cloned.Id, dh.ponPort); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
+ cloned.OperStatus = voltha.OperStatus_ACTIVE
+
+ // Update the device state
+ if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ dh.device = cloned
+}
+
+func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device, portNo int64) (*ic.PortCapability, error) {
+ cap := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
+ return &ic.PortCapability{
+ Port: &voltha.LogicalPort{
+ OfpPort: &of.OfpPort{
+ HwAddr: macAddressToUint32Array(dh.device.MacAddress),
+ Config: 0,
+ State: uint32(of.OfpPortState_OFPPS_LIVE),
+ Curr: cap,
+ Advertised: cap,
+ Peer: cap,
+ CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ },
+ DeviceId: dh.device.Id,
+ DevicePortNo: uint32(portNo),
+ },
+ }, nil
+}
+
+func (dh *DeviceHandler) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+ log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+ return nil
+}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
new file mode 100644
index 0000000..0a8efc6
--- /dev/null
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/kafka"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "sync"
+)
+
+type SimulatedONU struct {
+ deviceHandlers map[string]*DeviceHandler
+ coreProxy *com.CoreProxy
+ kafkaICProxy *kafka.InterContainerProxy
+ exitChannel chan int
+ lockDeviceHandlersMap sync.RWMutex
+}
+
+func NewSimulatedONU(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy) *SimulatedONU {
+ var simulatedOLT SimulatedONU
+ simulatedOLT.exitChannel = make(chan int, 1)
+ simulatedOLT.deviceHandlers = make(map[string]*DeviceHandler)
+ simulatedOLT.kafkaICProxy = kafkaICProxy
+ simulatedOLT.coreProxy = coreProxy
+ simulatedOLT.lockDeviceHandlersMap = sync.RWMutex{}
+ return &simulatedOLT
+}
+
+func (so *SimulatedONU) Start(ctx context.Context) error {
+ log.Info("starting-device-manager")
+ log.Info("device-manager-started")
+ return nil
+}
+
+func (so *SimulatedONU) Stop(ctx context.Context) error {
+ log.Info("stopping-device-manager")
+ so.exitChannel <- 1
+ log.Info("device-manager-stopped")
+ return nil
+}
+
+func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
+ if ctx.Err() == nil {
+ // Returned response only of the ctx has not been cancelled/timeout/etc
+ // Channel is automatically closed when a context is Done
+ ch <- result
+ log.Debugw("sendResponse", log.Fields{"result": result})
+ } else {
+ // Should the transaction be reverted back?
+ log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
+ }
+}
+
+func (so *SimulatedONU) addDeviceHandlerToMap(agent *DeviceHandler) {
+ so.lockDeviceHandlersMap.Lock()
+ defer so.lockDeviceHandlersMap.Unlock()
+ if _, exist := so.deviceHandlers[agent.deviceId]; !exist {
+ so.deviceHandlers[agent.deviceId] = agent
+ }
+}
+
+func (so *SimulatedONU) deleteDeviceHandlerToMap(agent *DeviceHandler) {
+ so.lockDeviceHandlersMap.Lock()
+ defer so.lockDeviceHandlersMap.Unlock()
+ delete(so.deviceHandlers, agent.deviceId)
+}
+
+func (so *SimulatedONU) getDeviceHandler(deviceId string) *DeviceHandler {
+ so.lockDeviceHandlersMap.Lock()
+ defer so.lockDeviceHandlersMap.Unlock()
+ if agent, ok := so.deviceHandlers[deviceId]; ok {
+ return agent
+ }
+ return nil
+}
+
+func (so *SimulatedONU) createDeviceTopic(device *voltha.Device) error {
+ log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
+ deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
+ if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic); err != nil {
+ log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return err
+ }
+ return nil
+}
+
+func (so *SimulatedONU) Adopt_device(device *voltha.Device) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Infow("adopt-device", log.Fields{"deviceId": device.Id})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler == nil {
+ handler := NewDeviceHandler(so.coreProxy, device, so)
+ so.addDeviceHandlerToMap(handler)
+ go handler.AdoptDevice(device)
+ // Launch the creation of the device topic
+ go so.createDeviceTopic(device)
+ }
+ return nil
+}
+
+func (so *SimulatedONU) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {
+ log.Infow("not-implemented-for-onu", log.Fields{"deviceId": device.Id})
+ return nil, nil
+}
+
+func (so *SimulatedONU) Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error) {
+ log.Infow("Get_ofp_port_info", log.Fields{"deviceId": device.Id})
+ if handler := so.getDeviceHandler(device.Id); handler != nil {
+ return handler.GetOfpPortInfo(device, port_no)
+ }
+ log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+ return nil, errors.New("device-handler-not-set")
+}
+
+func (so *SimulatedONU) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+ log.Infow("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+ targetDevice := msg.Header.ProxyDeviceId // Request?
+ if targetDevice == "" && msg.Header.ToDeviceId != "" {
+ // Typical response
+ targetDevice = msg.Header.ToDeviceId
+ }
+ if handler := so.getDeviceHandler(targetDevice); handler != nil {
+ return handler.Process_inter_adapter_message(msg)
+ }
+ return errors.New(fmt.Sprintf("handler-not-found-%s", targetDevice))
+}
+
+func (so *SimulatedONU) Adapter_descriptor() error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Device_types() (*voltha.DeviceTypes, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Health() (*voltha.HealthStatus, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Reconcile_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Abandon_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Disable_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Reenable_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Reboot_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Self_test_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Gelete_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Get_device_details(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Suppress_alarm(filter *voltha.AlarmFilter) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Unsuppress_alarm(filter *voltha.AlarmFilter) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Download_image(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
diff --git a/adapters/simulated_onu/config/config.go b/adapters/simulated_onu/config/config.go
new file mode 100644
index 0000000..2971169
--- /dev/null
+++ b/adapters/simulated_onu/config/config.go
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package config
+
+import (
+ "flag"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "os"
+)
+
+// Simulated OLT default constants
+const (
+ EtcdStoreName = "etcd"
+ default_InstanceID = "simulatedOnu001"
+ default_KafkaAdapterHost = "127.0.0.1"
+ default_KafkaAdapterPort = 9092
+ default_KafkaClusterHost = "127.0.0.1"
+ default_KafkaClusterPort = 9094
+ default_KVStoreType = EtcdStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "127.0.0.1"
+ default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ default_LogLevel = 0
+ default_Banner = false
+ default_Topic = "simulated_onu"
+ default_CoreTopic = "rwcore"
+)
+
+// AdapterFlags represents the set of configurations used by the read-write adaptercore service
+type AdapterFlags struct {
+ // Command line parameters
+ InstanceID string
+ KafkaAdapterHost string
+ KafkaAdapterPort int
+ KafkaClusterHost string
+ KafkaClusterPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ Topic string
+ CoreTopic string
+ LogLevel int
+ Banner bool
+}
+
+func init() {
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+// NewRWCoreFlags returns a new RWCore config
+func NewAdapterFlags() *AdapterFlags {
+ var adapterFlags = AdapterFlags{ // Default values
+ InstanceID: default_InstanceID,
+ KafkaAdapterHost: default_KafkaAdapterHost,
+ KafkaAdapterPort: default_KafkaAdapterPort,
+ KafkaClusterHost: default_KafkaClusterHost,
+ KafkaClusterPort: default_KafkaClusterPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ Topic: default_Topic,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ Banner: default_Banner,
+ }
+ return &adapterFlags
+}
+
+// ParseCommandArguments parses the arguments when running read-write adaptercore service
+func (so *AdapterFlags) ParseCommandArguments() {
+
+ var help string
+
+ help = fmt.Sprintf("Kafka - Adapter messaging host")
+ flag.StringVar(&(so.KafkaAdapterHost), "kafka_adapter_host", default_KafkaAdapterHost, help)
+
+ help = fmt.Sprintf("Kafka - Adapter messaging port")
+ flag.IntVar(&(so.KafkaAdapterPort), "kafka_adapter_port", default_KafkaAdapterPort, help)
+
+ help = fmt.Sprintf("Kafka - Cluster messaging host")
+ flag.StringVar(&(so.KafkaClusterHost), "kafka_cluster_host", default_KafkaClusterHost, help)
+
+ help = fmt.Sprintf("Kafka - Cluster messaging port")
+ flag.IntVar(&(so.KafkaClusterPort), "kafka_cluster_port", default_KafkaClusterPort, help)
+
+ help = fmt.Sprintf("Simulated ONU topic")
+ flag.StringVar(&(so.Topic), "simulator_topic", default_Topic, help)
+
+ help = fmt.Sprintf("Core topic")
+ flag.StringVar(&(so.CoreTopic), "core_topic", default_CoreTopic, help)
+
+ help = fmt.Sprintf("KV store type")
+ flag.StringVar(&(so.KVStoreType), "kv_store_type", default_KVStoreType, help)
+
+ help = fmt.Sprintf("The default timeout when making a kv store request")
+ flag.IntVar(&(so.KVStoreTimeout), "kv_store_request_timeout", default_KVStoreTimeout, help)
+
+ help = fmt.Sprintf("KV store host")
+ flag.StringVar(&(so.KVStoreHost), "kv_store_host", default_KVStoreHost, help)
+
+ help = fmt.Sprintf("KV store port")
+ flag.IntVar(&(so.KVStorePort), "kv_store_port", default_KVStorePort, help)
+
+ help = fmt.Sprintf("Log level")
+ flag.IntVar(&(so.LogLevel), "log_level", default_LogLevel, help)
+
+ help = fmt.Sprintf("Show startup banner log lines")
+ flag.BoolVar(&so.Banner, "banner", default_Banner, help)
+
+ flag.Parse()
+
+ containerName := getContainerInfo()
+ if len(containerName) > 0 {
+ so.InstanceID = containerName
+ }
+
+}
+
+func getContainerInfo() string {
+ return os.Getenv("HOSTNAME")
+}
diff --git a/adapters/simulated_onu/main.go b/adapters/simulated_onu/main.go
new file mode 100644
index 0000000..62cf2db
--- /dev/null
+++ b/adapters/simulated_onu/main.go
@@ -0,0 +1,337 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/adapters"
+ com "github.com/opencord/voltha-go/adapters/common"
+ ac "github.com/opencord/voltha-go/adapters/simulated_onu/adaptercore"
+ "github.com/opencord/voltha-go/adapters/simulated_onu/config"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/kafka"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "os"
+ "os/signal"
+ "strconv"
+ "syscall"
+ "time"
+)
+
+type adapter struct {
+ instanceId string
+ config *config.AdapterFlags
+ iAdapter adapters.IAdapter
+ kafkaClient kafka.Client
+ kvClient kvstore.Client
+ kip *kafka.InterContainerProxy
+ coreProxy *com.CoreProxy
+ halted bool
+ exitChannel chan int
+ receiverChannels []<-chan *ic.InterContainerMessage
+}
+
+func init() {
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+func newAdapter(cf *config.AdapterFlags) *adapter {
+ var a adapter
+ a.instanceId = cf.InstanceID
+ a.config = cf
+ a.halted = false
+ a.exitChannel = make(chan int, 1)
+ a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
+ return &a
+}
+
+func (a *adapter) start(ctx context.Context) {
+ log.Info("Starting Core Adapter components")
+ var err error
+
+ // Setup KV Client
+ log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
+ if err := a.setKVClient(); err != nil {
+ log.Fatal("error-setting-kv-client")
+ }
+
+ // Setup Kafka Client
+ if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
+ log.Fatal("Unsupported-common-client")
+ }
+
+ // Start the common InterContainer Proxy - retry indefinitely
+ if a.kip, err = a.startInterContainerProxy(-1); err != nil {
+ log.Fatal("error-starting-inter-container-proxy")
+ }
+
+ // Create the core proxy to handle requests to the Core
+ a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
+
+ // Create the simulated OLT adapter
+ if a.iAdapter, err = a.startSimulatedONU(ctx, a.kip, a.coreProxy); err != nil {
+ log.Fatal("error-starting-inter-container-proxy")
+ }
+
+ // Register the core request handler
+ if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+ log.Fatal("error-setting-core-request-handler")
+ }
+
+ // Register this adapter to the Core - retry indefinitely
+ if err = a.registerWithCore(-1); err != nil {
+ log.Fatal("error-registering-with-core")
+ }
+}
+
+func (rw *adapter) stop() {
+ // Stop leadership tracking
+ rw.halted = true
+
+ // send exit signal
+ rw.exitChannel <- 0
+
+ // Cleanup - applies only if we had a kvClient
+ if rw.kvClient != nil {
+ // Release all reservations
+ if err := rw.kvClient.ReleaseAllReservations(); err != nil {
+ log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
+ }
+ // Close the DB connection
+ rw.kvClient.Close()
+ }
+
+ // TODO: More cleanup
+}
+
+func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
+
+ log.Infow("kv-store-type", log.Fields{"store": storeType})
+ switch storeType {
+ case "consul":
+ return kvstore.NewConsulClient(address, timeout)
+ case "etcd":
+ return kvstore.NewEtcdClient(address, timeout)
+ }
+ return nil, errors.New("unsupported-kv-store")
+}
+
+func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+
+ log.Infow("common-client-type", log.Fields{"client": clientType})
+ switch clientType {
+ case "sarama":
+ return kafka.NewSaramaClient(
+ kafka.Host(host),
+ kafka.Port(port),
+ kafka.ProducerReturnOnErrors(true),
+ kafka.ProducerReturnOnSuccess(true),
+ kafka.ProducerMaxRetries(6),
+ kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+ }
+ return nil, errors.New("unsupported-client-type")
+}
+
+func (a *adapter) setKVClient() error {
+ addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
+ client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
+ if err != nil {
+ a.kvClient = nil
+ log.Error(err)
+ return err
+ }
+ a.kvClient = client
+ return nil
+}
+
+func toString(value interface{}) (string, error) {
+ switch t := value.(type) {
+ case []byte:
+ return string(value.([]byte)), nil
+ case string:
+ return value.(string), nil
+ default:
+ return "", fmt.Errorf("unexpected-type-%T", t)
+ }
+}
+
+func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
+ log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
+ "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
+ var err error
+ var kip *kafka.InterContainerProxy
+ if kip, err = kafka.NewInterContainerProxy(
+ kafka.InterContainerHost(a.config.KafkaAdapterHost),
+ kafka.InterContainerPort(a.config.KafkaAdapterPort),
+ kafka.MsgClient(a.kafkaClient),
+ kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
+ log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ count := 0
+ for {
+ if err = kip.Start(); err != nil {
+ log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
+ if retries == count {
+ return nil, err
+ }
+ count = +1
+ // Take a nap before retrying
+ time.Sleep(2 * time.Second)
+ } else {
+ break
+ }
+ }
+
+ log.Info("common-messaging-proxy-created")
+ return kip, nil
+}
+
+func (a *adapter) startSimulatedONU(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy) (*ac.SimulatedONU, error) {
+ log.Info("starting-simulated-onu")
+ var err error
+ sOLT := ac.NewSimulatedONU(ctx, a.kip, cp)
+
+ if err = sOLT.Start(ctx); err != nil {
+ log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+
+ log.Info("simulated-olt-started")
+ return sOLT, nil
+}
+
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+ log.Info("setting-request-handler")
+ requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+ if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
+ log.Errorw("adaptercore-request-handler-setup-failed", log.Fields{"error": err})
+ return err
+
+ }
+ log.Info("request-handler-setup-done")
+ return nil
+}
+func (a *adapter) registerWithCore(retries int) error {
+ log.Info("registering-with-core")
+ adapterDescription := &voltha.Adapter{Id: "simulated_onu", Vendor: "simulation Enterprise Inc"}
+ types := []*voltha.DeviceType{{Id: "simulated_onu", Adapter: "simulated_onu"}}
+ deviceTypes := &voltha.DeviceTypes{Items: types}
+ count := 0
+ for {
+ if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {
+ log.Warnw("registering-with-core-failed", log.Fields{"error": err})
+ if retries == count {
+ return err
+ }
+ count += 1
+ // Take a nap before retrying
+ time.Sleep(2 * time.Second)
+ } else {
+ break
+ }
+ }
+ log.Info("registered-with-core")
+ return nil
+}
+
+func waitForExit() int {
+ signalChannel := make(chan os.Signal, 1)
+ signal.Notify(signalChannel,
+ syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT)
+
+ exitChannel := make(chan int)
+
+ go func() {
+ s := <-signalChannel
+ switch s {
+ case syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT:
+ log.Infow("closing-signal-received", log.Fields{"signal": s})
+ exitChannel <- 0
+ default:
+ log.Infow("unexpected-signal-received", log.Fields{"signal": s})
+ exitChannel <- 1
+ }
+ }()
+
+ code := <-exitChannel
+ return code
+}
+
+func printBanner() {
+ fmt.Println(" _ _ _ _ ")
+ fmt.Println(" ___(_)_ __ ___ _ _| | __ _| |_ ___ __| | ___ _ __ _ _ ")
+ fmt.Println("/ __| | '_ ` _ \\| | | | |/ _` | __/ _ \\/ _` | / _ \\| '_ \\| | | | ")
+ fmt.Println("\\__ \\ | | | | | | |_| | | (_| | || __/ (_| | | (_) | | | | |_| | ")
+ fmt.Println("|___/_|_| |_| |_|\\__,_|_|\\__,_|\\__\\___|\\__,_|___\\___/|_| |_|\\__,_| ")
+ fmt.Println(" |_____| ")
+ fmt.Println(" ")
+}
+
+func main() {
+ start := time.Now()
+
+ cf := config.NewAdapterFlags()
+ cf.ParseCommandArguments()
+
+ //// Setup logging
+
+ //Setup default logger - applies for packages that do not have specific logger set
+ if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ // Update all loggers (provisionned via init) with a common field
+ if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel)
+
+ defer log.CleanUp()
+
+ // Print banner if specified
+ if cf.Banner {
+ printBanner()
+ }
+
+ log.Infow("config", log.Fields{"config": *cf})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ ad := newAdapter(cf)
+ go ad.start(ctx)
+
+ code := waitForExit()
+ log.Infow("received-a-closing-signal", log.Fields{"code": code})
+
+ // Cleanup before leaving
+ ad.stop()
+
+ elapsed := time.Since(start)
+ log.Infow("runtime", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
+}
diff --git a/compose/adapters-simulated.yml b/compose/adapters-simulated.yml
new file mode 100644
index 0000000..d937974
--- /dev/null
+++ b/compose/adapters-simulated.yml
@@ -0,0 +1,60 @@
+---
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2'
+services:
+ adapter_simulated_olt:
+ image: "${REGISTRY}${REPOSITORY}voltha-adapter-simulated-olt${TAG}"
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "10m"
+ max-file: "3"
+ command: [
+ "/app/simulated_olt",
+ "--kafka_adapter_host=${DOCKER_HOST_IP}",
+ "--kafka_adapter_port=9092",
+ "--kafka_cluster_host=${DOCKER_HOST_IP}",
+ "--kafka_cluster_port=9092",
+ "--core_topic=rwcore",
+ "--simulator_topic=simulated_olt",
+ "--onu_number=1"
+ ]
+ networks:
+ - default
+
+ adapter_simulated_onu:
+ image: "${REGISTRY}${REPOSITORY}voltha-adapter-simulated-onu${TAG}"
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "10m"
+ max-file: "3"
+ command: [
+ "/app/simulated_onu",
+ "--kafka_adapter_host=${DOCKER_HOST_IP}",
+ "--kafka_adapter_port=9092",
+ "--kafka_cluster_host=${DOCKER_HOST_IP}",
+ "--kafka_cluster_port=9092",
+ "--core_topic=rwcore",
+ "--simulator_topic=simulated_onu",
+ ]
+ networks:
+ - default
+
+
+networks:
+ default:
+ driver: bridge
diff --git a/docker/Dockerfile.simulated_olt b/docker/Dockerfile.simulated_olt
new file mode 100644
index 0000000..5cf71c4
--- /dev/null
+++ b/docker/Dockerfile.simulated_olt
@@ -0,0 +1,47 @@
+# -------------
+# Build stage
+
+FROM golang:1.9.2-alpine AS build-env
+
+# Install required packages
+RUN apk add --no-cache wget git make build-base protobuf protobuf-dev
+
+# Prepare directory structure
+RUN ["mkdir", "-p", "/src", "src/protos"]
+RUN ["mkdir", "-p", "$GOPATH/src", "$GOPATH/pkg", "$GOPATH/bin"]
+RUN ["mkdir", "-p", "$GOPATH/src/github.com/opencord/voltha/protos/go"]
+
+# Copy files
+ADD adapters/simulated_olt $GOPATH/src/github.com/opencord/voltha-go/adapters/simulated_olt
+ADD adapters/common $GOPATH/src/github.com/opencord/voltha-go/adapters/common
+ADD adapters/*.go $GOPATH/src/github.com/opencord/voltha-go/adapters/
+ADD common $GOPATH/src/github.com/opencord/voltha-go/common
+ADD db $GOPATH/src/github.com/opencord/voltha-go/db
+ADD kafka $GOPATH/src/github.com/opencord/voltha-go/kafka
+
+# Copy required proto files
+# ... VOLTHA proos
+ADD protos/*.proto /src/protos/
+ADD protos/scripts/* /src/protos/
+
+# Install golang protobuf
+RUN go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
+RUN go get -u github.com/golang/protobuf/protoc-gen-go
+
+# Compile protobuf files
+RUN sh /src/protos/build_protos.sh /src/protos
+
+# Build simulated_olt
+RUN cd $GOPATH/src/github.com/opencord/voltha-go/adapters/simulated_olt && go get -d ./... && rm -rf $GOPATH/src/go.etcd.io/etcd/vendor/golang.org/x/net/trace && go build -o /src/simulated_olt
+
+# -------------
+# Image creation stage
+
+FROM alpine:3.6
+
+# Set the working directory
+WORKDIR /app
+
+# Copy required files
+COPY --from=build-env /src/simulated_olt /app/
+
diff --git a/docker/Dockerfile.simulated_onu b/docker/Dockerfile.simulated_onu
new file mode 100644
index 0000000..9fb13be
--- /dev/null
+++ b/docker/Dockerfile.simulated_onu
@@ -0,0 +1,47 @@
+# -------------
+# Build stage
+
+FROM golang:1.9.2-alpine AS build-env
+
+# Install required packages
+RUN apk add --no-cache wget git make build-base protobuf protobuf-dev
+
+# Prepare directory structure
+RUN ["mkdir", "-p", "/src", "src/protos"]
+RUN ["mkdir", "-p", "$GOPATH/src", "$GOPATH/pkg", "$GOPATH/bin"]
+RUN ["mkdir", "-p", "$GOPATH/src/github.com/opencord/voltha/protos/go"]
+
+# Copy files
+ADD adapters/simulated_onu $GOPATH/src/github.com/opencord/voltha-go/adapters/simulated_onu
+ADD adapters/common $GOPATH/src/github.com/opencord/voltha-go/adapters/common
+ADD adapters/*.go $GOPATH/src/github.com/opencord/voltha-go/adapters/
+ADD common $GOPATH/src/github.com/opencord/voltha-go/common
+ADD db $GOPATH/src/github.com/opencord/voltha-go/db
+ADD kafka $GOPATH/src/github.com/opencord/voltha-go/kafka
+
+# Copy required proto files
+# ... VOLTHA proos
+ADD protos/*.proto /src/protos/
+ADD protos/scripts/* /src/protos/
+
+# Install golang protobuf
+RUN go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
+RUN go get -u github.com/golang/protobuf/protoc-gen-go
+
+# Compile protobuf files
+RUN sh /src/protos/build_protos.sh /src/protos
+
+# Build simulated_onu
+RUN cd $GOPATH/src/github.com/opencord/voltha-go/adapters/simulated_onu && go get -d ./... && rm -rf $GOPATH/src/go.etcd.io/etcd/vendor/golang.org/x/net/trace && go build -o /src/simulated_onu
+
+# -------------
+# Image creation stage
+
+FROM alpine:3.6
+
+# Set the working directory
+WORKDIR /app
+
+# Copy required files
+COPY --from=build-env /src/simulated_onu /app/
+
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 2d2714f..3381fdc 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -198,7 +198,7 @@
Id: deviceId,
DeviceType: deviceType,
ParentId: parentId,
- Publisher:publisher,
+ Publisher: publisher,
}
var marshalledData *any.Any
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index e330b85..2df19e5 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -69,6 +69,8 @@
doneCh chan int
topicToConsumerChannelMap map[string]*consumerChannels
lockTopicToConsumerChannelMap sync.RWMutex
+ topicLockMap map[string]*sync.RWMutex
+ lockOfTopicLockMap sync.RWMutex
}
type SaramaClientOption func(*SaramaClient)
@@ -187,7 +189,8 @@
}
client.lockTopicToConsumerChannelMap = sync.RWMutex{}
-
+ client.topicLockMap = make(map[string]*sync.RWMutex)
+ client.lockOfTopicLockMap = sync.RWMutex{}
return client
}
@@ -261,6 +264,9 @@
//CreateTopic creates a topic on the Kafka Broker.
func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
// Set the topic details
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(numPartition)
@@ -286,6 +292,9 @@
//DeleteTopic removes a topic from the kafka Broker
func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
// Remove the topic from the broker
if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
if err == sarama.ErrUnknownTopicOrPartition {
@@ -308,6 +317,9 @@
// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
// messages from that topic
func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
log.Debugw("subscribe", log.Fields{"topic": topic.Name})
// If a consumers already exist for that topic then resuse it
@@ -352,6 +364,9 @@
//UnSubscribe unsubscribe a consumer from a given topic
func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
return err
@@ -417,6 +432,26 @@
return nil
}
+func (sc *SaramaClient) lockTopic(topic *Topic) {
+ sc.lockOfTopicLockMap.Lock()
+ if _, exist := sc.topicLockMap[topic.Name]; exist {
+ sc.lockOfTopicLockMap.Unlock()
+ sc.topicLockMap[topic.Name].Lock()
+ } else {
+ sc.topicLockMap[topic.Name] = &sync.RWMutex{}
+ sc.lockOfTopicLockMap.Unlock()
+ sc.topicLockMap[topic.Name].Lock()
+ }
+}
+
+func (sc *SaramaClient) unLockTopic(topic *Topic) {
+ sc.lockOfTopicLockMap.Lock()
+ defer sc.lockOfTopicLockMap.Unlock()
+ if _, exist := sc.topicLockMap[topic.Name]; exist {
+ sc.topicLockMap[topic.Name].Unlock()
+ }
+}
+
func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 34fc956..e50c035 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -38,7 +38,7 @@
kafkaICProxy *kafka.InterContainerProxy
stateTransitions *TransitionMap
clusterDataProxy *model.Proxy
- coreInstanceId string
+ coreInstanceId string
exitChannel chan int
lockDeviceAgentsMap sync.RWMutex
}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 79ef982..9b7fac7 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -37,7 +37,7 @@
type APIHandler struct {
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
- packetInQueue *queue.Queue
+ packetInQueue *queue.Queue
da.DefaultAPIHandler
}
@@ -46,7 +46,7 @@
deviceMgr: deviceMgr,
logicalDeviceMgr: lDeviceMgr,
// TODO: Figure out what the 'hint' parameter to queue.New does
- packetInQueue: queue.New(10),
+ packetInQueue: queue.New(10),
}
return handler
}
diff --git a/rw_core/main.go b/rw_core/main.go
index dbb82b0..dd830c1 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -219,7 +219,7 @@
}
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
- log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
+ //log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
defer log.CleanUp()