[VOL-1359] This commit consists of the creation of the simulated
OLT and ONU adapters (in Go language). This update also provides
the set of files to build and run these containers.
Change-Id: Id7b0c77fdf60cb02c39908d4374d3e93fab5de67
diff --git a/adapters/simulated_onu/adaptercore/device_handler.go b/adapters/simulated_onu/adaptercore/device_handler.go
new file mode 100644
index 0000000..51a0667
--- /dev/null
+++ b/adapters/simulated_onu/adaptercore/device_handler.go
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "context"
+ "github.com/gogo/protobuf/proto"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ of "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "strconv"
+ "strings"
+ "sync"
+)
+
+//DeviceHandler follows the same patterns as ponsim_olt. The only difference is that it does not
+// interact with an OLT device.
+type DeviceHandler struct {
+ deviceId string
+ deviceType string
+ device *voltha.Device
+ coreProxy *com.CoreProxy
+ simulatedOLT *SimulatedONU
+ uniPort *voltha.Port
+ ponPort *voltha.Port
+ exitChannel chan int
+ lockDevice sync.RWMutex
+}
+
+//NewDeviceHandler creates a new device handler
+func NewDeviceHandler(cp *com.CoreProxy, device *voltha.Device, adapter *SimulatedONU) *DeviceHandler {
+ var dh DeviceHandler
+ dh.coreProxy = cp
+ cloned := (proto.Clone(device)).(*voltha.Device)
+ dh.deviceId = cloned.Id
+ dh.deviceType = cloned.Type
+ dh.device = cloned
+ dh.simulatedOLT = adapter
+ dh.exitChannel = make(chan int, 1)
+ dh.lockDevice = sync.RWMutex{}
+ return &dh
+}
+
+// start save the device to the data model
+func (dh *DeviceHandler) start(ctx context.Context) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ log.Debugw("starting-device-agent", log.Fields{"device": dh.device})
+ // Add the initial device to the local model
+ log.Debug("device-agent-started")
+}
+
+// stop stops the device dh. Not much to do for now
+func (dh *DeviceHandler) stop(ctx context.Context) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ log.Debug("stopping-device-agent")
+ dh.exitChannel <- 1
+ log.Debug("device-agent-stopped")
+}
+
+func macAddressToUint32Array(mac string) []uint32 {
+ slist := strings.Split(mac, ":")
+ result := make([]uint32, len(slist))
+ var err error
+ var tmp int64
+ for index, val := range slist {
+ if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
+ return []uint32{1, 2, 3, 4, 5, 6}
+ }
+ result[index] = uint32(tmp)
+ }
+ return result
+}
+
+func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
+ log.Debugw("AdoptDevice", log.Fields{"deviceId": device.Id})
+
+ // Update the device info
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.Root = false
+ cloned.Vendor = "simulators"
+ cloned.Model = "go-simulators"
+ cloned.SerialNumber = com.GetRandomSerialNumber()
+ cloned.MacAddress = strings.ToUpper(com.GetRandomMacAddress())
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.DeviceUpdate(nil, cloned); err != nil {
+ log.Errorw("error-updating-device", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ // Now create the NNI Port
+ dh.uniPort = &voltha.Port{
+ PortNo: 2,
+ Label: "UNI facing Ethernet port",
+ Type: voltha.Port_ETHERNET_UNI,
+ AdminState: voltha.AdminState_ENABLED,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ }
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.PortCreated(nil, cloned.Id, dh.uniPort); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ // Now create the PON Port
+ dh.ponPort = &voltha.Port{
+ PortNo: 1,
+ Label: "PON port",
+ Type: voltha.Port_PON_ONU,
+ AdminState: voltha.AdminState_ENABLED,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ Peers: []*voltha.Port_PeerPort{{DeviceId: cloned.ParentId,
+ PortNo: cloned.ParentPortNo}},
+ }
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.PortCreated(nil, cloned.Id, dh.ponPort); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
+ cloned.OperStatus = voltha.OperStatus_ACTIVE
+
+ // Update the device state
+ if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ dh.device = cloned
+}
+
+func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device, portNo int64) (*ic.PortCapability, error) {
+ cap := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
+ return &ic.PortCapability{
+ Port: &voltha.LogicalPort{
+ OfpPort: &of.OfpPort{
+ HwAddr: macAddressToUint32Array(dh.device.MacAddress),
+ Config: 0,
+ State: uint32(of.OfpPortState_OFPPS_LIVE),
+ Curr: cap,
+ Advertised: cap,
+ Peer: cap,
+ CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ },
+ DeviceId: dh.device.Id,
+ DevicePortNo: uint32(portNo),
+ },
+ }, nil
+}
+
+func (dh *DeviceHandler) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+ log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+ return nil
+}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
new file mode 100644
index 0000000..0a8efc6
--- /dev/null
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/kafka"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "sync"
+)
+
+type SimulatedONU struct {
+ deviceHandlers map[string]*DeviceHandler
+ coreProxy *com.CoreProxy
+ kafkaICProxy *kafka.InterContainerProxy
+ exitChannel chan int
+ lockDeviceHandlersMap sync.RWMutex
+}
+
+func NewSimulatedONU(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy) *SimulatedONU {
+ var simulatedOLT SimulatedONU
+ simulatedOLT.exitChannel = make(chan int, 1)
+ simulatedOLT.deviceHandlers = make(map[string]*DeviceHandler)
+ simulatedOLT.kafkaICProxy = kafkaICProxy
+ simulatedOLT.coreProxy = coreProxy
+ simulatedOLT.lockDeviceHandlersMap = sync.RWMutex{}
+ return &simulatedOLT
+}
+
+func (so *SimulatedONU) Start(ctx context.Context) error {
+ log.Info("starting-device-manager")
+ log.Info("device-manager-started")
+ return nil
+}
+
+func (so *SimulatedONU) Stop(ctx context.Context) error {
+ log.Info("stopping-device-manager")
+ so.exitChannel <- 1
+ log.Info("device-manager-stopped")
+ return nil
+}
+
+func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
+ if ctx.Err() == nil {
+ // Returned response only of the ctx has not been cancelled/timeout/etc
+ // Channel is automatically closed when a context is Done
+ ch <- result
+ log.Debugw("sendResponse", log.Fields{"result": result})
+ } else {
+ // Should the transaction be reverted back?
+ log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
+ }
+}
+
+func (so *SimulatedONU) addDeviceHandlerToMap(agent *DeviceHandler) {
+ so.lockDeviceHandlersMap.Lock()
+ defer so.lockDeviceHandlersMap.Unlock()
+ if _, exist := so.deviceHandlers[agent.deviceId]; !exist {
+ so.deviceHandlers[agent.deviceId] = agent
+ }
+}
+
+func (so *SimulatedONU) deleteDeviceHandlerToMap(agent *DeviceHandler) {
+ so.lockDeviceHandlersMap.Lock()
+ defer so.lockDeviceHandlersMap.Unlock()
+ delete(so.deviceHandlers, agent.deviceId)
+}
+
+func (so *SimulatedONU) getDeviceHandler(deviceId string) *DeviceHandler {
+ so.lockDeviceHandlersMap.Lock()
+ defer so.lockDeviceHandlersMap.Unlock()
+ if agent, ok := so.deviceHandlers[deviceId]; ok {
+ return agent
+ }
+ return nil
+}
+
+func (so *SimulatedONU) createDeviceTopic(device *voltha.Device) error {
+ log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
+ deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
+ if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic); err != nil {
+ log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return err
+ }
+ return nil
+}
+
+func (so *SimulatedONU) Adopt_device(device *voltha.Device) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Infow("adopt-device", log.Fields{"deviceId": device.Id})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler == nil {
+ handler := NewDeviceHandler(so.coreProxy, device, so)
+ so.addDeviceHandlerToMap(handler)
+ go handler.AdoptDevice(device)
+ // Launch the creation of the device topic
+ go so.createDeviceTopic(device)
+ }
+ return nil
+}
+
+func (so *SimulatedONU) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {
+ log.Infow("not-implemented-for-onu", log.Fields{"deviceId": device.Id})
+ return nil, nil
+}
+
+func (so *SimulatedONU) Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error) {
+ log.Infow("Get_ofp_port_info", log.Fields{"deviceId": device.Id})
+ if handler := so.getDeviceHandler(device.Id); handler != nil {
+ return handler.GetOfpPortInfo(device, port_no)
+ }
+ log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+ return nil, errors.New("device-handler-not-set")
+}
+
+func (so *SimulatedONU) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+ log.Infow("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+ targetDevice := msg.Header.ProxyDeviceId // Request?
+ if targetDevice == "" && msg.Header.ToDeviceId != "" {
+ // Typical response
+ targetDevice = msg.Header.ToDeviceId
+ }
+ if handler := so.getDeviceHandler(targetDevice); handler != nil {
+ return handler.Process_inter_adapter_message(msg)
+ }
+ return errors.New(fmt.Sprintf("handler-not-found-%s", targetDevice))
+}
+
+func (so *SimulatedONU) Adapter_descriptor() error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Device_types() (*voltha.DeviceTypes, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Health() (*voltha.HealthStatus, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Reconcile_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Abandon_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Disable_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Reenable_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Reboot_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Self_test_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Gelete_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Get_device_details(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Suppress_alarm(filter *voltha.AlarmFilter) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Unsuppress_alarm(filter *voltha.AlarmFilter) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Download_image(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}
+
+func (so *SimulatedONU) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
+ return errors.New("UnImplemented")
+}