VOL-1374: OLT Activation with Edgecore asfvolt16
Change-Id: I61ce4b0a6a3666070d08a162251d42d90817f409
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..08fddff
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,87 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ifeq ($(TAG),)
+TAG := latest
+endif
+
+ifeq ($(TARGET_TAG),)
+TARGET_TAG := latest
+endif
+
+# If no DOCKER_HOST_IP is specified grab a v4 IP address associated with
+# the default gateway
+ifeq ($(DOCKER_HOST_IP),)
+DOCKER_HOST_IP := $(shell ifconfig $$(netstat -rn | grep -E '^(default|0.0.0.0)' | head -1 | awk '{print $$NF}') | grep inet | awk '{print $$2}' | sed -e 's/addr://g')
+endif
+
+
+ifneq ($(http_proxy)$(https_proxy),)
+# Include proxies from the environment
+DOCKER_PROXY_ARGS = \
+ --build-arg http_proxy=$(http_proxy) \
+ --build-arg https_proxy=$(https_proxy) \
+ --build-arg ftp_proxy=$(ftp_proxy) \
+ --build-arg no_proxy=$(no_proxy) \
+ --build-arg HTTP_PROXY=$(HTTP_PROXY) \
+ --build-arg HTTPS_PROXY=$(HTTPS_PROXY) \
+ --build-arg FTP_PROXY=$(FTP_PROXY) \
+ --build-arg NO_PROXY=$(NO_PROXY)
+endif
+
+DOCKER_BUILD_ARGS = \
+ --build-arg TAG=$(TAG) \
+ --build-arg REGISTRY=$(REGISTRY) \
+ --build-arg REPOSITORY=$(REPOSITORY) \
+ $(DOCKER_PROXY_ARGS) $(DOCKER_CACHE_ARG) \
+ --rm --force-rm \
+ $(DOCKER_BUILD_EXTRA_ARGS)
+
+DOCKER_IMAGE_LIST = \
+ openolt-adapter-go
+
+
+.PHONY: $(DIRS) $(DIRS_CLEAN) $(DIRS_FLAKE8) adapters
+
+# This should to be the first and default target in this Makefile
+help:
+ @echo "Usage: make [<target>]"
+ @echo "where available targets are:"
+ @echo
+ @echo "build : Build the docker images.\n\
+ If this is the first time you are building, choose \"make build\" option."
+ @echo
+
+
+# Parallel Build
+$(DIRS):
+ @echo " MK $@"
+ $(Q)$(MAKE) -C $@
+
+# Parallel Clean
+DIRS_CLEAN = $(addsuffix .clean,$(DIRS))
+$(DIRS_CLEAN):
+ @echo " CLEAN $(basename $@)"
+ $(Q)$(MAKE) -C $(basename $@) clean
+
+build: containers
+
+containers: adapter_openolt_go
+
+adapter_openolt_go:
+ docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-openolt-adapter-go:${TAG} -f docker/Dockerfile.openolt .
+
+# end file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..0c3609b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,32 @@
+
+# How to Build and Run a GOlang based OpenOLT Adapter
+
+Assuming the VOLTHA2.0 environment is made using the quickstart.md in voltha-go.
+
+```
+cd ~/source/voltha-openolt-adapter
+```
+
+Get the latest code changes
+```
+git pull
+```
+To build the docker image
+```
+make build
+```
+This will create the voltha-openolt-adapter-go docker image
+```
+$ docker images | grep openolt
+voltha-openolt-adapter-go latest 38688e697472 2 hours ago 37.3MB
+```
+In case the python voltha openolt adapter is started, stop the python voltha openolt docker container
+
+
+To start the GOlang based OpenOLT adapter
+
+DOCKER_HOST_IP=<HOST-IP> docker-compose -f compose/adapters-openolt-go.yml up -d
+
+The functionality of OLT activation can be verified through BBSIM
+Follow the below steps to start BBSIM and provision it through VOLTHA-CLI
+https://github.com/opencord/voltha-bbsim/blob/master/README.md
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
new file mode 100644
index 0000000..07ae169
--- /dev/null
+++ b/adaptercore/device_handler.go
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "context"
+ "errors"
+ "io"
+ "strconv"
+ "strings"
+ "sync"
+
+ "github.com/gogo/protobuf/proto"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/protos/common"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ of "github.com/opencord/voltha-go/protos/openflow_13"
+ oop "github.com/opencord/voltha-go/protos/openolt"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc"
+)
+
+//DeviceHandler will interact with the OLT device.
+type DeviceHandler struct {
+ deviceId string
+ deviceType string
+ device *voltha.Device
+ coreProxy *com.CoreProxy
+ openOLT *OpenOLT
+ nniPort *voltha.Port
+ ponPort *voltha.Port
+ exitChannel chan int
+ lockDevice sync.RWMutex
+ client oop.OpenoltClient
+ transitionMap *TransitionMap
+ clientCon *grpc.ClientConn
+}
+
+//NewDeviceHandler creates a new device handler
+func NewDeviceHandler(cp *com.CoreProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
+ var dh DeviceHandler
+ dh.coreProxy = cp
+ cloned := (proto.Clone(device)).(*voltha.Device)
+ dh.deviceId = cloned.Id
+ dh.deviceType = cloned.Type
+ dh.device = cloned
+ dh.openOLT = adapter
+ dh.exitChannel = make(chan int, 1)
+ dh.lockDevice = sync.RWMutex{}
+
+ //TODO initialize the support classes.
+ return &dh
+}
+
+// start save the device to the data model
+func (dh *DeviceHandler) start(ctx context.Context) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ log.Debugw("starting-device-agent", log.Fields{"device": dh.device})
+ // Add the initial device to the local model
+ log.Debug("device-agent-started")
+}
+
+// stop stops the device dh. Not much to do for now
+func (dh *DeviceHandler) stop(ctx context.Context) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ log.Debug("stopping-device-agent")
+ dh.exitChannel <- 1
+ log.Debug("device-agent-stopped")
+}
+
+func macAddressToUint32Array(mac string) []uint32 {
+ slist := strings.Split(mac, ":")
+ result := make([]uint32, len(slist))
+ var err error
+ var tmp int64
+ for index, val := range slist {
+ if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
+ return []uint32{1, 2, 3, 4, 5, 6}
+ }
+ result[index] = uint32(tmp)
+ }
+ return result
+}
+
+func portName(portNum uint32, portType voltha.Port_PortType, intfId uint32) string {
+
+ if portType == voltha.Port_PON_OLT {
+ return "pon-" + string(portNum)
+ } else if portType == voltha.Port_ETHERNET_NNI {
+ return "nni-" + string(intfId)
+ } else if portType == voltha.Port_ETHERNET_UNI {
+ log.Errorw("local UNI management not supported", log.Fields{})
+ }
+ return ""
+}
+
+func (dh *DeviceHandler) addPort(intfId uint32, portType voltha.Port_PortType, state string) {
+ var operStatus common.OperStatus_OperStatus
+ if state == "up" {
+ operStatus = voltha.OperStatus_ACTIVE
+ } else {
+ operStatus = voltha.OperStatus_DISCOVERED
+ }
+
+ // TODO
+ //portNum := platform.intfIdToPortNo(intfId, portType)
+ portNum := intfId
+
+ label := portName(portNum, portType, intfId)
+ // Now create the PON Port
+ ponPort := &voltha.Port{
+ PortNo: portNum,
+ Label: label,
+ Type: portType,
+ OperStatus: operStatus,
+ }
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.PortCreated(nil, dh.device.Id, ponPort); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": dh.device.Id, "error": err})
+ }
+}
+
+// readIndications to read the indications from the OLT device
+func (dh *DeviceHandler) readIndications() {
+ indications, err := dh.client.EnableIndication(context.Background(), new(oop.Empty))
+ if err != nil {
+ log.Errorw("Failed to read indications", log.Fields{"err": err})
+ return
+ }
+ if indications == nil {
+ log.Errorw("Indications is nil", log.Fields{})
+ return
+ }
+ for {
+ indication, err := indications.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ log.Infow("Failed to read from indications", log.Fields{"err": err})
+ continue
+ }
+ switch indication.Data.(type) {
+ case *oop.Indication_OltInd:
+ oltInd := indication.GetOltInd()
+ if oltInd.OperState == "up" {
+ dh.transitionMap.Handle(DeviceUpInd)
+ } else if oltInd.OperState == "down" {
+ dh.transitionMap.Handle(DeviceDownInd)
+ }
+ case *oop.Indication_IntfInd:
+
+ intfInd := indication.GetIntfInd()
+ go dh.addPort(intfInd.GetIntfId(), voltha.Port_PON_OLT, intfInd.GetOperState())
+ log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
+ case *oop.Indication_IntfOperInd:
+ intfOperInd := indication.GetIntfOperInd()
+ if intfOperInd.GetType() == "nni" {
+ go dh.addPort(intfOperInd.GetIntfId(), voltha.Port_ETHERNET_NNI, intfOperInd.GetOperState())
+ } else if intfOperInd.GetType() == "pon" {
+ // TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
+ // Handle pon port update
+ }
+ log.Infow("Received interface oper indication ", log.Fields{"InterfaceOperInd": intfOperInd})
+ case *oop.Indication_OnuDiscInd:
+ onuDiscInd := indication.GetOnuDiscInd()
+ log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
+ // TODO Get onu ID from the resource manager
+ onuId := 0
+ go dh.coreProxy.ChildDeviceDetected(nil, dh.device.Id, int(onuDiscInd.GetIntfId()),
+ "brcm_openomci_onu", int(onuDiscInd.GetIntfId()), string(onuDiscInd.SerialNumber.GetVendorId()),
+ string(onuDiscInd.SerialNumber.GetVendorSpecific()), int64(onuId))
+ case *oop.Indication_OnuInd:
+ onuInd := indication.GetOnuInd()
+ log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
+ case *oop.Indication_OmciInd:
+ omciInd := indication.GetOmciInd()
+ log.Infow("Received Omci indication ", log.Fields{"OmciInd": omciInd})
+ case *oop.Indication_PktInd:
+ pktInd := indication.GetPktInd()
+ log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
+ case *oop.Indication_PortStats:
+ portStats := indication.GetPortStats()
+ log.Infow("Received port stats indication", log.Fields{"PortStats": portStats})
+ case *oop.Indication_FlowStats:
+ flowStats := indication.GetFlowStats()
+ log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
+ case *oop.Indication_AlarmInd:
+ alarmInd := indication.GetAlarmInd()
+ log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
+ }
+ }
+}
+
+// doStateUp handle the olt up indication and update to voltha core
+func (dh *DeviceHandler) doStateUp() error {
+ // Synchronous call to update device state - this method is run in its own go routine
+ if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
+ voltha.OperStatus_ACTIVE); err != nil {
+ log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceId": dh.device.Id, "error": err})
+ return err
+ }
+ return nil
+}
+
+// doStateDown handle the olt down indication
+func (dh *DeviceHandler) doStateDown() error {
+ //TODO Handle oper state down
+ return nil
+}
+
+// doStateInit dial the grpc before going to init state
+func (dh *DeviceHandler) doStateInit() error {
+ var err error
+ dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure())
+ if err != nil {
+ log.Errorw("Failed to dial device", log.Fields{"DeviceId": dh.deviceId, "HostAndPort": dh.device.GetHostAndPort(), "err": err})
+ return err
+ }
+ return nil
+}
+
+// postInit create olt client instance to invoke RPC on the olt device
+func (dh *DeviceHandler) postInit() error {
+ dh.client = oop.NewOpenoltClient(dh.clientCon)
+ dh.transitionMap.Handle(GrpcConnected)
+ return nil
+}
+
+// doStateConnected get the device info and update to voltha core
+func (dh *DeviceHandler) doStateConnected() error {
+ deviceInfo, err := dh.client.GetDeviceInfo(context.Background(), new(oop.Empty))
+ if err != nil {
+ log.Errorw("Failed to fetch device info", log.Fields{"err": err})
+ return err
+ }
+ if deviceInfo == nil {
+ log.Errorw("Device info is nil", log.Fields{})
+ return errors.New("Failed to get device info from OLT")
+ }
+
+ dh.device.Root = true
+ dh.device.Vendor = deviceInfo.Vendor
+ dh.device.Model = deviceInfo.Model
+ dh.device.ConnectStatus = voltha.ConnectStatus_REACHABLE
+ dh.device.SerialNumber = deviceInfo.DeviceSerialNumber
+ dh.device.HardwareVersion = deviceInfo.HardwareVersion
+ dh.device.FirmwareVersion = deviceInfo.FirmwareVersion
+ // TODO : Check whether this MAC address is learnt from SDPON or need to send from device
+ dh.device.MacAddress = "0a:0b:0c:0d:0e:0f"
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.DeviceUpdate(nil, dh.device); err != nil {
+ log.Errorw("error-updating-device", log.Fields{"deviceId": dh.device.Id, "error": err})
+ }
+
+ // Start reading indications
+ go dh.readIndications()
+ return nil
+}
+
+// AdoptDevice adopts the OLT device
+func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
+ dh.transitionMap = NewTransitionMap(dh)
+ log.Infow("AdoptDevice", log.Fields{"deviceId": device.Id, "Address": device.GetHostAndPort()})
+ dh.transitionMap.Handle(DeviceInit)
+}
+
+// GetOfpDeviceInfo Get the Ofp device information
+func (dh *DeviceHandler) GetOfpDeviceInfo(device *voltha.Device) (*ic.SwitchCapability, error) {
+ return &ic.SwitchCapability{
+ Desc: &of.OfpDesc{
+ HwDesc: "open_pon",
+ SwDesc: "open_pon",
+ SerialNum: dh.device.SerialNumber,
+ },
+ SwitchFeatures: &of.OfpSwitchFeatures{
+ NBuffers: 256,
+ NTables: 2,
+ Capabilities: uint32(of.OfpCapabilities_OFPC_FLOW_STATS |
+ of.OfpCapabilities_OFPC_TABLE_STATS |
+ of.OfpCapabilities_OFPC_PORT_STATS |
+ of.OfpCapabilities_OFPC_GROUP_STATS),
+ },
+ }, nil
+}
+
+// GetOfpPortInfo Get Ofp port information
+func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device, portNo int64) (*ic.PortCapability, error) {
+ cap := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
+ return &ic.PortCapability{
+ Port: &voltha.LogicalPort{
+ OfpPort: &of.OfpPort{
+ HwAddr: macAddressToUint32Array(dh.device.MacAddress),
+ Config: 0,
+ State: uint32(of.OfpPortState_OFPPS_LIVE),
+ Curr: cap,
+ Advertised: cap,
+ Peer: cap,
+ CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ },
+ DeviceId: dh.device.Id,
+ DevicePortNo: uint32(portNo),
+ },
+ }, nil
+}
+
+// Process_inter_adapter_message process inter adater message
+func (dh *DeviceHandler) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+ // TODO
+ log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+ return nil
+}
+
diff --git a/adaptercore/olt_state_transitions.go b/adaptercore/olt_state_transitions.go
new file mode 100644
index 0000000..45e5f5e
--- /dev/null
+++ b/adaptercore/olt_state_transitions.go
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "reflect"
+ "runtime"
+
+ "github.com/opencord/voltha-go/common/log"
+)
+
+// DeviceState OLT Device state
+type DeviceState int
+
+const (
+ // deviceStateNull OLT is not instantiated
+ deviceStateNull DeviceState = iota
+ // deviceStateInit OLT is instantiated
+ deviceStateInit
+ // deviceStateConnected Grpc session established with OLT
+ deviceStateConnected
+ // deviceStateUp Admin state of OLT is UP
+ deviceStateUp
+ // deviceStateDown Admin state of OLT is down
+ deviceStateDown
+)
+
+// Trigger for changing the state
+type Trigger int
+
+const (
+ // DeviceInit Go to Device init state
+ DeviceInit Trigger = iota
+ // GrpcConnected Go to connected state
+ GrpcConnected
+ // DeviceUpInd Go to Device up state
+ DeviceUpInd
+ // DeviceDownInd Go to Device down state
+ DeviceDownInd
+ // GrpcDisconnected Go to Device init state
+ GrpcDisconnected
+)
+
+// TransitionHandler function type for handling transition
+type TransitionHandler func() error
+
+// Transition to store state machine
+type Transition struct {
+ previousState []DeviceState
+ currentState DeviceState
+ before []TransitionHandler
+ after []TransitionHandler
+}
+
+// TransitionMap to store all the states and current device state
+type TransitionMap struct {
+ transitions map[Trigger]Transition
+ currentDeviceState DeviceState
+}
+
+// OpenoltDevice state machine:
+//
+// null ----> init ------> connected -----> up -----> down
+// ^ ^ | ^ | |
+// | | | | | |
+// | +-------------+ +---------+ |
+// | |
+// +-----------------------------------------+
+
+// NewTransitionMap create a new state machine with all the transitions
+func NewTransitionMap(dh *DeviceHandler) *TransitionMap {
+ var transitionMap TransitionMap
+ transitionMap.currentDeviceState = deviceStateNull
+ transitionMap.transitions = make(map[Trigger]Transition)
+ // In doInit establish the grpc session
+ transitionMap.transitions[DeviceInit] =
+ Transition{
+ previousState: []DeviceState{deviceStateNull},
+ currentState: deviceStateInit,
+ before: []TransitionHandler{dh.doStateInit},
+ after: []TransitionHandler{dh.postInit}}
+ // If gRpc session fails, re-establish the grpc session
+ transitionMap.transitions[GrpcDisconnected] =
+ Transition{
+ previousState: []DeviceState{deviceStateConnected, deviceStateDown},
+ currentState: deviceStateInit,
+ before: []TransitionHandler{dh.doStateInit},
+ after: []TransitionHandler{dh.postInit}}
+ // in doConnected, create logical device and read the indications
+ transitionMap.transitions[GrpcConnected] =
+ Transition{
+ previousState: []DeviceState{deviceStateInit},
+ currentState: deviceStateConnected,
+ before: []TransitionHandler{dh.doStateConnected}}
+
+ // Once the olt UP is indication received, then do state up
+ transitionMap.transitions[DeviceUpInd] =
+ Transition{
+ previousState: []DeviceState{deviceStateConnected, deviceStateDown},
+ currentState: deviceStateUp,
+ before: []TransitionHandler{dh.doStateUp}}
+ // If olt DOWN indication comes then do sate down
+ transitionMap.transitions[DeviceDownInd] =
+ Transition{
+ previousState: []DeviceState{deviceStateUp},
+ currentState: deviceStateDown,
+ before: []TransitionHandler{dh.doStateDown}}
+
+ return &transitionMap
+}
+
+// funcName gets the handler function name
+func funcName(f interface{}) string {
+ p := reflect.ValueOf(f).Pointer()
+ rf := runtime.FuncForPC(p)
+ return rf.Name()
+}
+
+// isValidTransition checks for the new state transition is valid from current state
+func (tMap *TransitionMap) isValidTransition(trigger Trigger) bool {
+ // Validate the state transition
+ for _, state := range tMap.transitions[trigger].previousState {
+ if tMap.currentDeviceState == state {
+ return true
+ }
+ }
+ return false
+}
+
+// Handle moves the state machine to next state based on the trigger and invokes the before and
+// after handlers
+func (tMap *TransitionMap) Handle(trigger Trigger) {
+
+ // Check whether the transtion is valid from current state
+ if !tMap.isValidTransition(trigger) {
+ log.Errorw("Invalid transition triggered ", log.Fields{"CurrentState": tMap.currentDeviceState, "Trigger": trigger})
+ return
+ }
+
+ // Invoke the before handlers
+ beforeHandlers := tMap.transitions[trigger].before
+ if beforeHandlers != nil {
+ for _, handler := range beforeHandlers {
+ log.Debugw("running-before-handler", log.Fields{"handler": funcName(handler)})
+ if err := handler(); err != nil {
+ // TODO handle error
+ return
+ }
+ }
+ } else {
+ log.Debugw("No handlers for before", log.Fields{"trigger": trigger})
+ }
+
+ // Update the state
+ tMap.currentDeviceState = tMap.transitions[trigger].currentState
+ log.Debugw("Updated device state ", log.Fields{"CurrentDeviceState": tMap.currentDeviceState})
+
+ // Invoke the after handlers
+ afterHandlers := tMap.transitions[trigger].after
+ if afterHandlers != nil {
+ for _, handler := range afterHandlers {
+ log.Debugw("running-after-handler", log.Fields{"handler": funcName(handler)})
+ if err := handler(); err != nil {
+ // TODO handle error
+ return
+ }
+ }
+ } else {
+ log.Debugw("No handlers for after", log.Fields{"trigger": trigger})
+ }
+}
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
new file mode 100644
index 0000000..18ef08f
--- /dev/null
+++ b/adaptercore/openolt.go
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/kafka"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+)
+
+type OpenOLT struct {
+ deviceHandlers map[string]*DeviceHandler
+ coreProxy *com.CoreProxy
+ kafkaICProxy *kafka.InterContainerProxy
+ numOnus int
+ exitChannel chan int
+ lockDeviceHandlersMap sync.RWMutex
+}
+
+func NewOpenOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy, onuNumber int) *OpenOLT {
+ var openOLT OpenOLT
+ openOLT.exitChannel = make(chan int, 1)
+ openOLT.deviceHandlers = make(map[string]*DeviceHandler)
+ openOLT.kafkaICProxy = kafkaICProxy
+ openOLT.numOnus = onuNumber
+ openOLT.coreProxy = coreProxy
+ openOLT.lockDeviceHandlersMap = sync.RWMutex{}
+ return &openOLT
+}
+
+func (oo *OpenOLT) Start(ctx context.Context) error {
+ log.Info("starting-device-manager")
+ log.Info("device-manager-started")
+ return nil
+}
+
+func (oo *OpenOLT) Stop(ctx context.Context) error {
+ log.Info("stopping-device-manager")
+ oo.exitChannel <- 1
+ log.Info("device-manager-stopped")
+ return nil
+}
+
+func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
+ if ctx.Err() == nil {
+ // Returned response only of the ctx has not been cancelled/timeout/etc
+ // Channel is automatically closed when a context is Done
+ ch <- result
+ log.Debugw("sendResponse", log.Fields{"result": result})
+ } else {
+ // Should the transaction be reverted back?
+ log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
+ }
+}
+
+func (oo *OpenOLT) addDeviceHandlerToMap(agent *DeviceHandler) {
+ oo.lockDeviceHandlersMap.Lock()
+ defer oo.lockDeviceHandlersMap.Unlock()
+ if _, exist := oo.deviceHandlers[agent.deviceId]; !exist {
+ oo.deviceHandlers[agent.deviceId] = agent
+ }
+}
+
+func (oo *OpenOLT) deleteDeviceHandlerToMap(agent *DeviceHandler) {
+ oo.lockDeviceHandlersMap.Lock()
+ defer oo.lockDeviceHandlersMap.Unlock()
+ delete(oo.deviceHandlers, agent.deviceId)
+}
+
+func (oo *OpenOLT) getDeviceHandler(deviceId string) *DeviceHandler {
+ oo.lockDeviceHandlersMap.Lock()
+ defer oo.lockDeviceHandlersMap.Unlock()
+ if agent, ok := oo.deviceHandlers[deviceId]; ok {
+ return agent
+ }
+ return nil
+}
+
+func (oo *OpenOLT) createDeviceTopic(device *voltha.Device) error {
+ log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
+ deviceTopic := kafka.Topic{Name: oo.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
+ // TODO for the offset
+ if err := oo.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, 0); err != nil {
+ log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return err
+ }
+ return nil
+}
+
+func (oo *OpenOLT) Adopt_device(device *voltha.Device) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Infow("adopt-device", log.Fields{"deviceId": device.Id})
+ var handler *DeviceHandler
+ if handler = oo.getDeviceHandler(device.Id); handler == nil {
+ handler := NewDeviceHandler(oo.coreProxy, device, oo)
+ oo.addDeviceHandlerToMap(handler)
+ go handler.AdoptDevice(device)
+ // Launch the creation of the device topic
+ go oo.createDeviceTopic(device)
+ }
+ return nil
+}
+
+func (oo *OpenOLT) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {
+ log.Infow("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
+ if handler := oo.getDeviceHandler(device.Id); handler != nil {
+ return handler.GetOfpDeviceInfo(device)
+ }
+ log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+ return nil, errors.New("device-handler-not-set")
+}
+
+func (oo *OpenOLT) Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error) {
+ log.Infow("Get_ofp_port_info", log.Fields{"deviceId": device.Id})
+ if handler := oo.getDeviceHandler(device.Id); handler != nil {
+ return handler.GetOfpPortInfo(device, port_no)
+ }
+ log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+ return nil, errors.New("device-handler-not-set")
+}
+
+func (oo *OpenOLT) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+ log.Infow("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+ targetDevice := msg.Header.ProxyDeviceId // Request?
+ if targetDevice == "" && msg.Header.ToDeviceId != "" {
+ // Typical response
+ targetDevice = msg.Header.ToDeviceId
+ }
+ if handler := oo.getDeviceHandler(targetDevice); handler != nil {
+ return handler.Process_inter_adapter_message(msg)
+ }
+ return errors.New(fmt.Sprintf("handler-not-found-%s", targetDevice))
+}
+
+func (oo *OpenOLT) Adapter_descriptor() error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Device_types() (*voltha.DeviceTypes, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Health() (*voltha.HealthStatus, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Reconcile_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Abandon_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Disable_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Reenable_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Reboot_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Self_test_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Gelete_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Get_device_details(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Suppress_alarm(filter *voltha.AlarmFilter) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Unsuppress_alarm(filter *voltha.AlarmFilter) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Download_image(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
+}
diff --git a/compose/adapters-openolt-go.yml b/compose/adapters-openolt-go.yml
new file mode 100644
index 0000000..a3475d9
--- /dev/null
+++ b/compose/adapters-openolt-go.yml
@@ -0,0 +1,42 @@
+---
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2'
+services:
+ adapter_openolt:
+ image: "${REGISTRY}${REPOSITORY}voltha-openolt-adapter-go${TAG}"
+ logging:
+ driver: "json-file"
+ options:
+ max-size: "10m"
+ max-file: "3"
+ command: [
+ "/app/openolt",
+ "--kafka_adapter_host=${DOCKER_HOST_IP}",
+ "--kafka_adapter_port=9092",
+ "--kafka_cluster_host=${DOCKER_HOST_IP}",
+ "--kafka_cluster_port=9092",
+ "--core_topic=rwcore",
+ "--kv_store_host=${DOCKER_HOST_IP}",
+ "--kv_store_port=2379"
+ ]
+ ports:
+ - "50062:50062"
+ networks:
+ - default
+
+networks:
+ default:
+ driver: bridge
diff --git a/config/config.go b/config/config.go
new file mode 100644
index 0000000..4f30e65
--- /dev/null
+++ b/config/config.go
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package config
+
+import (
+ "flag"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "os"
+)
+
+// Open OLT default constants
+const (
+ EtcdStoreName = "etcd"
+ default_InstanceID = "openOlt001"
+ default_KafkaAdapterHost = "127.0.0.1"
+ default_KafkaAdapterPort = 9092
+ default_KafkaClusterHost = "127.0.0.1"
+ default_KafkaClusterPort = 9094
+ default_KVStoreType = EtcdStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "127.0.0.1"
+ default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ default_LogLevel = 0
+ default_Banner = false
+ default_Topic = "openolt"
+ default_CoreTopic = "rwcore"
+ default_OnuNumber = 1
+)
+
+// AdapterFlags represents the set of configurations used by the read-write adaptercore service
+type AdapterFlags struct {
+ // Command line parameters
+ InstanceID string
+ KafkaAdapterHost string
+ KafkaAdapterPort int
+ KafkaClusterHost string
+ KafkaClusterPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ Topic string
+ CoreTopic string
+ LogLevel int
+ OnuNumber int
+ Banner bool
+}
+
+func init() {
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+// NewRWCoreFlags returns a new RWCore config
+func NewAdapterFlags() *AdapterFlags {
+ var adapterFlags = AdapterFlags{ // Default values
+ InstanceID: default_InstanceID,
+ KafkaAdapterHost: default_KafkaAdapterHost,
+ KafkaAdapterPort: default_KafkaAdapterPort,
+ KafkaClusterHost: default_KafkaClusterHost,
+ KafkaClusterPort: default_KafkaClusterPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ Topic: default_Topic,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ OnuNumber: default_OnuNumber,
+ Banner: default_Banner,
+ }
+ return &adapterFlags
+}
+
+// ParseCommandArguments parses the arguments when running read-write adaptercore service
+func (so *AdapterFlags) ParseCommandArguments() {
+
+ var help string
+
+ help = fmt.Sprintf("Kafka - Adapter messaging host")
+ flag.StringVar(&(so.KafkaAdapterHost), "kafka_adapter_host", default_KafkaAdapterHost, help)
+
+ help = fmt.Sprintf("Kafka - Adapter messaging port")
+ flag.IntVar(&(so.KafkaAdapterPort), "kafka_adapter_port", default_KafkaAdapterPort, help)
+
+ help = fmt.Sprintf("Kafka - Cluster messaging host")
+ flag.StringVar(&(so.KafkaClusterHost), "kafka_cluster_host", default_KafkaClusterHost, help)
+
+ help = fmt.Sprintf("Kafka - Cluster messaging port")
+ flag.IntVar(&(so.KafkaClusterPort), "kafka_cluster_port", default_KafkaClusterPort, help)
+
+ help = fmt.Sprintf("Open OLT topic")
+ flag.StringVar(&(so.Topic), "adapter_topic", default_Topic, help)
+
+ help = fmt.Sprintf("Core topic")
+ flag.StringVar(&(so.CoreTopic), "core_topic", default_CoreTopic, help)
+
+ help = fmt.Sprintf("KV store type")
+ flag.StringVar(&(so.KVStoreType), "kv_store_type", default_KVStoreType, help)
+
+ help = fmt.Sprintf("The default timeout when making a kv store request")
+ flag.IntVar(&(so.KVStoreTimeout), "kv_store_request_timeout", default_KVStoreTimeout, help)
+
+ help = fmt.Sprintf("KV store host")
+ flag.StringVar(&(so.KVStoreHost), "kv_store_host", default_KVStoreHost, help)
+
+ help = fmt.Sprintf("KV store port")
+ flag.IntVar(&(so.KVStorePort), "kv_store_port", default_KVStorePort, help)
+
+ help = fmt.Sprintf("Log level")
+ flag.IntVar(&(so.LogLevel), "log_level", default_LogLevel, help)
+
+ help = fmt.Sprintf("Number of ONUs")
+ flag.IntVar(&(so.OnuNumber), "onu_number", default_OnuNumber, help)
+
+ help = fmt.Sprintf("Show startup banner log lines")
+ flag.BoolVar(&so.Banner, "banner", default_Banner, help)
+
+ flag.Parse()
+
+ containerName := getContainerInfo()
+ if len(containerName) > 0 {
+ so.InstanceID = containerName
+ }
+
+}
+
+func getContainerInfo() string {
+ return os.Getenv("HOSTNAME")
+}
diff --git a/docker/Dockerfile.openolt b/docker/Dockerfile.openolt
new file mode 100644
index 0000000..c975591
--- /dev/null
+++ b/docker/Dockerfile.openolt
@@ -0,0 +1,58 @@
+# -------------
+# Build stage
+
+FROM golang:1.10.7-alpine AS build-env
+
+# Install required packages
+RUN apk add --no-cache wget git make build-base protobuf protobuf-dev
+
+# Install protobuf requirements
+RUN git clone https://github.com/googleapis/googleapis.git /usr/local/include/googleapis
+RUN go get google.golang.org/genproto/googleapis/api/annotations
+
+# Prepare directory structure
+RUN ["mkdir", "-p", "/src", "src/protos"]
+RUN ["mkdir", "-p", "$GOPATH/src", "$GOPATH/pkg", "$GOPATH/bin"]
+RUN ["mkdir", "-p", "$GOPATH/src/github.com/opencord"]
+RUN ["mkdir", "-p", "$GOPATH/src/github.com/opencord/voltha-go"]
+
+WORKDIR $GOPATH/src/github.com/opencord
+
+RUN git clone https://gerrit.opencord.org/voltha-go.git
+
+WORKDIR $GOPATH/src/github.com/opencord/voltha-go
+
+
+# Copy files
+ADD adaptercore ./adapters/openolt/adaptercore
+ADD config ./adapters/openolt/config
+ADD *.go ./adapters/openolt
+
+RUN ls ./adapters/openolt
+
+# Copy required proto files
+ADD openolt.proto ./protos
+
+# Install the protoc-gen-go
+RUN go install ./vendor/github.com/golang/protobuf/protoc-gen-go
+
+# Compile protobuf files
+RUN sh protos/scripts/build_protos.sh protos
+
+RUN protoc --go_out=Mgoogle/protobuf/descriptor.proto=github.com/golang/protobuf/protoc-gen-go/descriptor,plugins=grpc:$GOPATH/src -I protos -I /usr/local/include/googleapis protos/openolt.proto
+
+# Build openolt
+
+RUN cd adapters/openolt && go build -o /src/openolt
+
+# -------------
+# Image creation stage
+
+FROM alpine:3.8
+
+# Set the working directory
+WORKDIR /app
+
+# Copy required files
+COPY --from=build-env /src/openolt /app/
+
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..12c5ff6
--- /dev/null
+++ b/main.go
@@ -0,0 +1,340 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/adapters"
+ com "github.com/opencord/voltha-go/adapters/common"
+ ac "github.com/opencord/voltha-go/adapters/openolt/adaptercore"
+ "github.com/opencord/voltha-go/adapters/openolt/config"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/kafka"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "os"
+ "os/signal"
+ "strconv"
+ "syscall"
+ "time"
+)
+
+type adapter struct {
+ instanceId string
+ config *config.AdapterFlags
+ iAdapter adapters.IAdapter
+ kafkaClient kafka.Client
+ kvClient kvstore.Client
+ kip *kafka.InterContainerProxy
+ coreProxy *com.CoreProxy
+ halted bool
+ exitChannel chan int
+ receiverChannels []<-chan *ic.InterContainerMessage
+}
+
+func init() {
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+func newAdapter(cf *config.AdapterFlags) *adapter {
+ var a adapter
+ a.instanceId = cf.InstanceID
+ a.config = cf
+ a.halted = false
+ a.exitChannel = make(chan int, 1)
+ a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
+ return &a
+}
+
+func (a *adapter) start(ctx context.Context) {
+ log.Info("Starting Core Adapter components")
+ var err error
+
+ // Setup KV Client
+ log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
+ if err := a.setKVClient(); err != nil {
+ log.Fatal("error-setting-kv-client")
+ }
+
+ // Setup Kafka Client
+ if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
+ log.Fatal("Unsupported-common-client")
+ }
+
+ // Start the common InterContainer Proxy - retries indefinitely
+ if a.kip, err = a.startInterContainerProxy(-1); err != nil {
+ log.Fatal("error-starting-inter-container-proxy")
+ }
+
+ // Create the core proxy to handle requests to the Core
+ a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
+
+ // Create the open OLT adapter
+ if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.config.OnuNumber); err != nil {
+ log.Fatal("error-starting-inter-container-proxy")
+ }
+
+ // Register the core request handler
+ if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+ log.Fatal("error-setting-core-request-handler")
+ }
+
+ // Register this adapter to the Core - retries indefinitely
+ if err = a.registerWithCore(-1); err != nil {
+ log.Fatal("error-registering-with-core")
+ }
+}
+
+func (rw *adapter) stop() {
+ // Stop leadership tracking
+ rw.halted = true
+
+ // send exit signal
+ rw.exitChannel <- 0
+
+ // Cleanup - applies only if we had a kvClient
+ if rw.kvClient != nil {
+ // Release all reservations
+ if err := rw.kvClient.ReleaseAllReservations(); err != nil {
+ log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
+ }
+ // Close the DB connection
+ rw.kvClient.Close()
+ }
+
+ // TODO: More cleanup
+}
+
+func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
+
+ log.Infow("kv-store-type", log.Fields{"store": storeType})
+ switch storeType {
+ case "consul":
+ return kvstore.NewConsulClient(address, timeout)
+ case "etcd":
+ return kvstore.NewEtcdClient(address, timeout)
+ }
+ return nil, errors.New("unsupported-kv-store")
+}
+
+func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+
+ log.Infow("common-client-type", log.Fields{"client": clientType})
+ switch clientType {
+ case "sarama":
+ return kafka.NewSaramaClient(
+ kafka.Host(host),
+ kafka.Port(port),
+ kafka.ProducerReturnOnErrors(true),
+ kafka.ProducerReturnOnSuccess(true),
+ kafka.ProducerMaxRetries(6),
+ kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+ }
+ return nil, errors.New("unsupported-client-type")
+}
+
+func (a *adapter) setKVClient() error {
+ addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
+ client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
+ if err != nil {
+ a.kvClient = nil
+ log.Error(err)
+ return err
+ }
+ a.kvClient = client
+ return nil
+}
+
+func toString(value interface{}) (string, error) {
+ switch t := value.(type) {
+ case []byte:
+ return string(value.([]byte)), nil
+ case string:
+ return value.(string), nil
+ default:
+ return "", fmt.Errorf("unexpected-type-%T", t)
+ }
+}
+
+func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
+ log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
+ "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
+ var err error
+ var kip *kafka.InterContainerProxy
+ if kip, err = kafka.NewInterContainerProxy(
+ kafka.InterContainerHost(a.config.KafkaAdapterHost),
+ kafka.InterContainerPort(a.config.KafkaAdapterPort),
+ kafka.MsgClient(a.kafkaClient),
+ kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
+ log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ count := 0
+ for {
+ if err = kip.Start(); err != nil {
+ log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
+ if retries == count {
+ return nil, err
+ }
+ count = +1
+ // Take a nap before retrying
+ time.Sleep(2 * time.Second)
+ } else {
+ break
+ }
+ }
+
+ log.Info("common-messaging-proxy-created")
+ return kip, nil
+}
+
+func (a *adapter) startOpenOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, onuNumber int) (*ac.OpenOLT, error) {
+ log.Info("starting-open-olt")
+ var err error
+ sOLT := ac.NewOpenOLT(ctx, a.kip, cp, onuNumber)
+
+ if err = sOLT.Start(ctx); err != nil {
+ log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+
+ log.Info("open-olt-started")
+ return sOLT, nil
+}
+
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+ log.Info("setting-request-handler")
+ requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, a.coreProxy)
+ if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
+ log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
+ return err
+
+ }
+ log.Info("request-handler-setup-done")
+ return nil
+}
+
+func (a *adapter) registerWithCore(retries int) error {
+ log.Info("registering-with-core")
+ adapterDescription := &voltha.Adapter{Id: "openolt", Vendor: "simulation Enterprise Inc"}
+ types := []*voltha.DeviceType{{Id: "openolt", Adapter: "openolt"}}
+ deviceTypes := &voltha.DeviceTypes{Items: types}
+ count := 0
+ for {
+ if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {
+ log.Warnw("registering-with-core-failed", log.Fields{"error": err})
+ if retries == count {
+ return err
+ }
+ count += 1
+ // Take a nap before retrying
+ time.Sleep(2 * time.Second)
+ } else {
+ break
+ }
+ }
+ log.Info("registered-with-core")
+ return nil
+}
+
+func waitForExit() int {
+ signalChannel := make(chan os.Signal, 1)
+ signal.Notify(signalChannel,
+ syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT)
+
+ exitChannel := make(chan int)
+
+ go func() {
+ s := <-signalChannel
+ switch s {
+ case syscall.SIGHUP,
+ syscall.SIGINT,
+ syscall.SIGTERM,
+ syscall.SIGQUIT:
+ log.Infow("closing-signal-received", log.Fields{"signal": s})
+ exitChannel <- 0
+ default:
+ log.Infow("unexpected-signal-received", log.Fields{"signal": s})
+ exitChannel <- 1
+ }
+ }()
+
+ code := <-exitChannel
+ return code
+}
+
+func printBanner() {
+ fmt.Println(" ____ ____ _ _______ ")
+ fmt.Println(" / _ \\ / __\\| | |__ __|")
+ fmt.Println(" | | | |_ __ ___ _ __ | | | | | | | ")
+ fmt.Println(" | | | | '_\\ / _\\ '_\\ | | | | | | | ")
+ fmt.Println(" | |__| | |_) | __/ | | || |__| | |____| | ")
+ fmt.Println(" \\____/| .__/\\___|_| |_|\\____/|______|_| ")
+ fmt.Println(" | | ")
+ fmt.Println(" |_| ")
+ fmt.Println(" ")
+}
+
+func main() {
+ start := time.Now()
+
+ cf := config.NewAdapterFlags()
+ cf.ParseCommandArguments()
+
+ //// Setup logging
+
+ //Setup default logger - applies for packages that do not have specific logger set
+ if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ // Update all loggers (provisionned via init) with a common field
+ if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel)
+
+ defer log.CleanUp()
+
+ // Print banner if specified
+ if cf.Banner {
+ printBanner()
+ }
+
+ log.Infow("config", log.Fields{"config": *cf})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ ad := newAdapter(cf)
+ go ad.start(ctx)
+
+ code := waitForExit()
+ log.Infow("received-a-closing-signal", log.Fields{"code": code})
+
+ // Cleanup before leaving
+ ad.stop()
+
+ elapsed := time.Since(start)
+ log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
+}
diff --git a/openolt.proto b/openolt.proto
new file mode 100644
index 0000000..709105b
--- /dev/null
+++ b/openolt.proto
@@ -0,0 +1,560 @@
+// Copyright (c) 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option go_package = "github.com/opencord/voltha-go/protos/openolt";
+
+package openolt;
+
+import "google/api/annotations.proto";
+
+service Openolt {
+
+ rpc DisableOlt(Empty) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/Disable"
+ body: "*"
+ };
+ }
+
+ rpc ReenableOlt(Empty) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/Reenable"
+ body: "*"
+ };
+ }
+
+ rpc ActivateOnu(Onu) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/EnableOnu"
+ body: "*"
+ };
+ }
+
+ rpc DeactivateOnu(Onu) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/DisableOnu"
+ body: "*"
+ };
+ }
+
+ rpc DeleteOnu(Onu) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/DeleteOnu"
+ body: "*"
+ };
+ }
+
+ rpc OmciMsgOut(OmciMsg) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/OmciMsgOut"
+ body: "*"
+ };
+ }
+
+ rpc OnuPacketOut(OnuPacket) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/OnuPacketOut"
+ body: "*"
+ };
+ }
+
+ rpc UplinkPacketOut(UplinkPacket) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/UplinkPacketOut"
+ body: "*"
+ };
+ }
+
+ rpc FlowAdd(Flow) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/FlowAdd"
+ body: "*"
+ };
+ }
+
+ rpc FlowRemove(Flow) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/FlowRemove"
+ body: "*"
+ };
+ }
+
+ rpc HeartbeatCheck(Empty) returns (Heartbeat) {
+ option (google.api.http) = {
+ post: "/v1/HeartbeatCheck"
+ body: "*"
+ };
+ }
+
+ rpc EnablePonIf(Interface) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/EnablePonIf"
+ body: "*"
+ };
+ }
+
+ rpc DisablePonIf(Interface) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/DisablePonIf"
+ body: "*"
+ };
+ }
+
+ rpc GetDeviceInfo(Empty) returns (DeviceInfo) {
+ option (google.api.http) = {
+ post: "/v1/GetDeviceInfo"
+ body: "*"
+ };
+ }
+
+ rpc Reboot(Empty) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/Reboot"
+ body: "*"
+ };
+ }
+
+ rpc CollectStatistics(Empty) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/CollectStatistics"
+ body: "*"
+ };
+ }
+
+ rpc CreateTconts(Tconts) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/CreateTconts"
+ body: "*"
+ };
+ }
+
+ rpc RemoveTconts(Tconts) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/RemoveTconts"
+ body: "*"
+ };
+ }
+
+ rpc EnableIndication(Empty) returns (stream Indication) {}
+}
+
+message Indication {
+ oneof data {
+ OltIndication olt_ind = 1;
+ IntfIndication intf_ind = 2;
+ IntfOperIndication intf_oper_ind = 3;
+ OnuDiscIndication onu_disc_ind = 4;
+ OnuIndication onu_ind = 5;
+ OmciIndication omci_ind = 6;
+ PacketIndication pkt_ind = 7;
+ PortStatistics port_stats = 8;
+ FlowStatistics flow_stats = 9;
+ AlarmIndication alarm_ind= 10;
+ }
+}
+
+message AlarmIndication {
+ oneof data {
+ LosIndication los_ind = 1;
+ DyingGaspIndication dying_gasp_ind = 2;
+ OnuAlarmIndication onu_alarm_ind = 3;
+ OnuStartupFailureIndication onu_startup_fail_ind = 4;
+ OnuSignalDegradeIndication onu_signal_degrade_ind = 5;
+ OnuDriftOfWindowIndication onu_drift_of_window_ind = 6;
+ OnuLossOfOmciChannelIndication onu_loss_omci_ind = 7;
+ OnuSignalsFailureIndication onu_signals_fail_ind = 8;
+ OnuTransmissionInterferenceWarning onu_tiwi_ind = 9;
+ OnuActivationFailureIndication onu_activation_fail_ind = 10;
+ OnuProcessingErrorIndication onu_processing_error_ind = 11;
+ }
+}
+
+message OltIndication {
+ string oper_state = 1; // up, down
+}
+
+message IntfIndication {
+ fixed32 intf_id = 1;
+ string oper_state = 2; // up, down
+}
+
+message OnuDiscIndication {
+ fixed32 intf_id = 1;
+ SerialNumber serial_number = 2;
+}
+
+message OnuIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ string oper_state = 3; // up, down
+ string admin_state = 5; // up, down
+ SerialNumber serial_number = 4;
+}
+
+message IntfOperIndication {
+ string type = 1; // nni, pon
+ fixed32 intf_id = 2;
+ string oper_state = 3; // up, down
+}
+
+message OmciIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ bytes pkt = 3;
+}
+
+message PacketIndication {
+ string intf_type = 5; // nni, pon, unknown
+ fixed32 intf_id = 1;
+ fixed32 gemport_id = 2;
+ fixed32 flow_id = 3;
+ fixed32 port_no = 6;
+ fixed64 cookie = 7;
+ bytes pkt = 4;
+}
+
+message Interface {
+ fixed32 intf_id = 1;
+}
+
+message Heartbeat {
+ fixed32 heartbeat_signature = 1;
+}
+
+message Onu {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ SerialNumber serial_number = 3;
+ fixed32 pir = 4; // peak information rate assigned to onu
+}
+
+message OmciMsg {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ bytes pkt = 3;
+}
+
+message OnuPacket {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ fixed32 port_no = 4;
+ bytes pkt = 3;
+}
+
+message UplinkPacket {
+ fixed32 intf_id = 1;
+ bytes pkt = 2;
+}
+
+message DeviceInfo {
+ string vendor = 1;
+ string model = 2;
+ string hardware_version = 3;
+ string firmware_version = 4;
+ string device_id = 16;
+ string device_serial_number = 17;
+
+ // Total number of pon intf ports on the device
+ fixed32 pon_ports = 12;
+
+ // If using global per-device technology profile. To be deprecated
+ string technology = 5;
+ fixed32 onu_id_start = 6;
+ fixed32 onu_id_end = 7;
+ fixed32 alloc_id_start = 8;
+ fixed32 alloc_id_end = 9;
+ fixed32 gemport_id_start = 10;
+ fixed32 gemport_id_end = 11;
+ fixed32 flow_id_start = 13;
+ fixed32 flow_id_end = 14;
+
+ message DeviceResourceRanges {
+
+ // List of 0 or more intf_ids that use the same technology and pools.
+ // If 0 intf_ids supplied, it implies ALL interfaces
+ repeated fixed32 intf_ids = 1;
+
+ // Technology profile for this pool
+ string technology = 2;
+
+ message Pool {
+ enum PoolType {
+ ONU_ID = 0;
+ ALLOC_ID = 1;
+ GEMPORT_ID = 2;
+ FLOW_ID = 3;
+ }
+
+ enum SharingType {
+ DEDICATED_PER_INTF = 0;
+ SHARED_BY_ALL_INTF_ALL_TECH = 1; // Shared across all interfaces in all technologies in all ranges
+ SHARED_BY_ALL_INTF_SAME_TECH = 2; // Shared across all interfaces of the same technology used in this range
+ }
+
+ PoolType type = 1;
+ SharingType sharing = 2;
+ fixed32 start = 3; // lower bound on IDs allocated from this pool
+ fixed32 end = 4; // upper bound on IDs allocated from this pool
+ }
+ repeated Pool pools = 3;
+ }
+ repeated DeviceResourceRanges ranges = 15;
+}
+
+message Classifier {
+ fixed32 o_tpid = 1;
+ fixed32 o_vid = 2;
+ fixed32 i_tpid = 3;
+ fixed32 i_vid = 4;
+ fixed32 o_pbits = 5;
+ fixed32 i_pbits = 6;
+ fixed32 eth_type = 7;
+ bytes dst_mac = 8;
+ bytes src_mac = 9;
+ fixed32 ip_proto = 10;
+ fixed32 dst_ip = 11;
+ fixed32 src_ip = 12;
+ fixed32 src_port = 13;
+ fixed32 dst_port = 14;
+ string pkt_tag_type = 15; // untagged, single_tag, double_tag
+}
+
+message ActionCmd {
+ bool add_outer_tag = 1;
+ bool remove_outer_tag = 2;
+ bool trap_to_host = 3;
+}
+
+message Action {
+ ActionCmd cmd = 1;
+ fixed32 o_vid = 2;
+ fixed32 o_pbits = 3;
+ fixed32 o_tpid = 4;
+ fixed32 i_vid = 5;
+ fixed32 i_pbits = 6;
+ fixed32 i_tpid = 7;
+}
+
+message Flow {
+ sfixed32 access_intf_id = 1;
+ sfixed32 onu_id = 2;
+ sfixed32 uni_id = 11;
+ fixed32 flow_id = 3;
+ string flow_type = 4; // upstream, downstream, broadcast, multicast
+ sfixed32 alloc_id = 10;
+ sfixed32 network_intf_id = 5;
+ sfixed32 gemport_id = 6;
+ Classifier classifier = 7;
+ Action action = 8;
+ sfixed32 priority = 9;
+ fixed64 cookie = 12; // must be provided for any flow with trap_to_host action. Returned in PacketIndication
+ fixed32 port_no = 13; // must be provided for any flow with trap_to_host action. Returned in PacketIndication
+}
+
+message SerialNumber {
+ bytes vendor_id = 1;
+ bytes vendor_specific = 2;
+}
+
+message PortStatistics {
+ fixed32 intf_id = 1;
+ fixed64 rx_bytes = 2;
+ fixed64 rx_packets = 3;
+ fixed64 rx_ucast_packets = 4;
+ fixed64 rx_mcast_packets = 5;
+ fixed64 rx_bcast_packets = 6;
+ fixed64 rx_error_packets = 7;
+ fixed64 tx_bytes = 8;
+ fixed64 tx_packets = 9;
+ fixed64 tx_ucast_packets = 10;
+ fixed64 tx_mcast_packets = 11;
+ fixed64 tx_bcast_packets = 12;
+ fixed64 tx_error_packets = 13;
+ fixed64 rx_crc_errors = 14;
+ fixed64 bip_errors = 15;
+ fixed32 timestamp = 16;
+}
+
+message FlowStatistics {
+ fixed32 flow_id = 1;
+ fixed64 rx_bytes = 2;
+ fixed64 rx_packets = 3;
+ fixed64 tx_bytes = 8;
+ fixed64 tx_packets = 9;
+ fixed32 timestamp = 16;
+}
+
+message LosIndication {
+ fixed32 intf_id = 1;
+ string status = 2;
+}
+
+message DyingGaspIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ string status = 3;
+}
+
+message OnuAlarmIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ string los_status = 3;
+ string lob_status = 4;
+ string lopc_miss_status = 5;
+ string lopc_mic_error_status = 6;
+}
+
+message OnuStartupFailureIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ string status = 3;
+}
+
+message OnuSignalDegradeIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ string status = 3;
+ fixed32 inverse_bit_error_rate = 4;
+}
+
+message OnuDriftOfWindowIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ string status = 3;
+ fixed32 drift = 4;
+ fixed32 new_eqd = 5;
+}
+
+message OnuLossOfOmciChannelIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ string status = 3;
+}
+
+message OnuSignalsFailureIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ string status = 3;
+ fixed32 inverse_bit_error_rate = 4;
+}
+
+message OnuTransmissionInterferenceWarning {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ string status = 3;
+ fixed32 drift = 4;
+}
+
+message OnuActivationFailureIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+}
+
+message OnuProcessingErrorIndication {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+}
+
+enum Direction {
+ UPSTREAM = 0;
+ DOWNSTREAM = 1;
+ BIDIRECTIONAL = 2;
+}
+
+enum SchedulingPolicy {
+ WRR = 0;
+ StrictPriority = 1;
+ Hybrid = 2;
+}
+
+enum AdditionalBW {
+ AdditionalBW_None = 0;
+ AdditionalBW_NA = 1;
+ AdditionalBW_BestEffort = 2;
+ AdditionalBW_Auto = 3;
+}
+
+enum DiscardPolicy {
+ TailDrop = 0;
+ WTailDrop = 1;
+ Red = 2;
+ WRed = 3;
+}
+
+enum InferredAdditionBWIndication {
+ InferredAdditionBWIndication_None = 0;
+ InferredAdditionBWIndication_Assured = 1;
+ InferredAdditionBWIndication_BestEffort = 2;
+}
+
+message Scheduler {
+ Direction direction = 1;
+ AdditionalBW additional_bw = 2; // Valid on for “direction == Upstream”.
+ fixed32 priority = 3;
+ fixed32 weight = 4;
+ SchedulingPolicy sched_policy = 5;
+}
+
+message TrafficShapingInfo {
+ fixed32 cir = 1;
+ fixed32 cbs = 2;
+ fixed32 pir = 3;
+ fixed32 pbs = 4;
+ fixed32 gir = 5; // only if “direction == Upstream ”
+ InferredAdditionBWIndication add_bw_ind = 6; // only if “direction == Upstream”
+}
+
+message Tcont {
+ Direction direction = 1;
+ fixed32 alloc_id = 2; // valid only if “direction == Upstream ”
+ Scheduler scheduler = 3;
+ TrafficShapingInfo traffic_shaping_info = 4;
+}
+
+message Tconts {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ fixed32 uni_id = 4;
+ fixed32 port_no = 5;
+ repeated Tcont tconts = 3;
+}
+
+message TailDropDiscardConfig {
+ fixed32 queue_size = 1;
+}
+
+message RedDiscardConfig {
+ fixed32 min_threshold = 1;
+ fixed32 max_threshold = 2;
+ fixed32 max_probability = 3;
+}
+
+message WRedDiscardConfig {
+ RedDiscardConfig green = 1;
+ RedDiscardConfig yellow = 2;
+ RedDiscardConfig red = 3;
+}
+
+message DiscardConfig {
+ DiscardPolicy discard_policy = 1;
+ oneof discard_config {
+ TailDropDiscardConfig tail_drop_discard_config = 2;
+ RedDiscardConfig red_discard_config = 3;
+ WRedDiscardConfig wred_discard_config = 4;
+ }
+}
+
+message Empty {}