VOL-1374: OLT Activation with Edgecore asfvolt16

Change-Id: I61ce4b0a6a3666070d08a162251d42d90817f409
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..08fddff
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,87 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ifeq ($(TAG),)
+TAG := latest
+endif
+
+ifeq ($(TARGET_TAG),)
+TARGET_TAG := latest
+endif
+
+# If no DOCKER_HOST_IP is specified grab a v4 IP address associated with
+# the default gateway
+ifeq ($(DOCKER_HOST_IP),)
+DOCKER_HOST_IP := $(shell ifconfig $$(netstat -rn | grep -E '^(default|0.0.0.0)' | head -1 | awk '{print $$NF}') | grep inet | awk '{print $$2}' | sed -e 's/addr://g')
+endif
+
+
+ifneq ($(http_proxy)$(https_proxy),)
+# Include proxies from the environment
+DOCKER_PROXY_ARGS = \
+       --build-arg http_proxy=$(http_proxy) \
+       --build-arg https_proxy=$(https_proxy) \
+       --build-arg ftp_proxy=$(ftp_proxy) \
+       --build-arg no_proxy=$(no_proxy) \
+       --build-arg HTTP_PROXY=$(HTTP_PROXY) \
+       --build-arg HTTPS_PROXY=$(HTTPS_PROXY) \
+       --build-arg FTP_PROXY=$(FTP_PROXY) \
+       --build-arg NO_PROXY=$(NO_PROXY)
+endif
+
+DOCKER_BUILD_ARGS = \
+	--build-arg TAG=$(TAG) \
+	--build-arg REGISTRY=$(REGISTRY) \
+	--build-arg REPOSITORY=$(REPOSITORY) \
+	$(DOCKER_PROXY_ARGS) $(DOCKER_CACHE_ARG) \
+	 --rm --force-rm \
+	$(DOCKER_BUILD_EXTRA_ARGS)
+
+DOCKER_IMAGE_LIST = \
+	openolt-adapter-go
+
+
+.PHONY: $(DIRS) $(DIRS_CLEAN) $(DIRS_FLAKE8) adapters 
+
+# This should to be the first and default target in this Makefile
+help:
+	@echo "Usage: make [<target>]"
+	@echo "where available targets are:"
+	@echo
+	@echo "build        : Build the docker images.\n\
+               If this is the first time you are building, choose \"make build\" option."
+	@echo
+
+
+# Parallel Build
+$(DIRS):
+	@echo "    MK $@"
+	$(Q)$(MAKE) -C $@
+
+# Parallel Clean
+DIRS_CLEAN = $(addsuffix .clean,$(DIRS))
+$(DIRS_CLEAN):
+	@echo "    CLEAN $(basename $@)"
+	$(Q)$(MAKE) -C $(basename $@) clean
+
+build: containers
+
+containers: adapter_openolt_go
+
+adapter_openolt_go:
+	docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-openolt-adapter-go:${TAG} -f docker/Dockerfile.openolt .
+
+# end file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..0c3609b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,32 @@
+
+# How to Build and Run a GOlang based OpenOLT Adapter 
+
+Assuming the VOLTHA2.0 environment is made using the quickstart.md in voltha-go.
+
+```
+cd ~/source/voltha-openolt-adapter
+```
+
+Get the latest code changes
+```
+git pull
+```
+To build the docker image
+```
+make build
+```
+This will create the voltha-openolt-adapter-go docker image
+```
+$ docker images | grep openolt
+voltha-openolt-adapter-go        latest              38688e697472        2 hours ago         37.3MB
+```
+In case the python voltha openolt adapter is started, stop the python voltha openolt docker container
+
+
+To start the GOlang based OpenOLT adapter 
+
+DOCKER_HOST_IP=<HOST-IP> docker-compose -f compose/adapters-openolt-go.yml up -d
+
+The functionality of OLT activation can be verified through BBSIM
+Follow the below steps to start BBSIM and provision it through VOLTHA-CLI
+https://github.com/opencord/voltha-bbsim/blob/master/README.md
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
new file mode 100644
index 0000000..07ae169
--- /dev/null
+++ b/adaptercore/device_handler.go
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+    "context"
+    "errors"
+    "io"
+    "strconv"
+    "strings"
+    "sync"
+
+    "github.com/gogo/protobuf/proto"
+    com "github.com/opencord/voltha-go/adapters/common"
+    "github.com/opencord/voltha-go/common/log"
+    "github.com/opencord/voltha-go/protos/common"
+    ic "github.com/opencord/voltha-go/protos/inter_container"
+    of "github.com/opencord/voltha-go/protos/openflow_13"
+    oop "github.com/opencord/voltha-go/protos/openolt"
+    "github.com/opencord/voltha-go/protos/voltha"
+    "google.golang.org/grpc"
+)
+
+//DeviceHandler will interact with the OLT device.
+type DeviceHandler struct {
+    deviceId      string
+    deviceType    string
+    device        *voltha.Device
+    coreProxy     *com.CoreProxy
+    openOLT       *OpenOLT
+    nniPort       *voltha.Port
+    ponPort       *voltha.Port
+    exitChannel   chan int
+    lockDevice    sync.RWMutex
+    client        oop.OpenoltClient
+    transitionMap *TransitionMap
+    clientCon     *grpc.ClientConn
+}
+
+//NewDeviceHandler creates a new device handler
+func NewDeviceHandler(cp *com.CoreProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
+    var dh DeviceHandler
+    dh.coreProxy = cp
+    cloned := (proto.Clone(device)).(*voltha.Device)
+    dh.deviceId = cloned.Id
+    dh.deviceType = cloned.Type
+    dh.device = cloned
+    dh.openOLT = adapter
+    dh.exitChannel = make(chan int, 1)
+    dh.lockDevice = sync.RWMutex{}
+
+    //TODO initialize the support classes.
+    return &dh
+}
+
+// start save the device to the data model
+func (dh *DeviceHandler) start(ctx context.Context) {
+    dh.lockDevice.Lock()
+    defer dh.lockDevice.Unlock()
+    log.Debugw("starting-device-agent", log.Fields{"device": dh.device})
+    // Add the initial device to the local model
+    log.Debug("device-agent-started")
+}
+
+// stop stops the device dh.  Not much to do for now
+func (dh *DeviceHandler) stop(ctx context.Context) {
+    dh.lockDevice.Lock()
+    defer dh.lockDevice.Unlock()
+    log.Debug("stopping-device-agent")
+    dh.exitChannel <- 1
+    log.Debug("device-agent-stopped")
+}
+
+func macAddressToUint32Array(mac string) []uint32 {
+    slist := strings.Split(mac, ":")
+    result := make([]uint32, len(slist))
+    var err error
+    var tmp int64
+    for index, val := range slist {
+        if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
+            return []uint32{1, 2, 3, 4, 5, 6}
+        }
+        result[index] = uint32(tmp)
+    }
+    return result
+}
+
+func portName(portNum uint32, portType voltha.Port_PortType, intfId uint32) string {
+
+    if portType == voltha.Port_PON_OLT {
+        return "pon-" + string(portNum)
+    } else if portType == voltha.Port_ETHERNET_NNI {
+        return "nni-" + string(intfId)
+    } else if portType == voltha.Port_ETHERNET_UNI {
+        log.Errorw("local UNI management not supported", log.Fields{})
+    }
+    return ""
+}
+
+func (dh *DeviceHandler) addPort(intfId uint32, portType voltha.Port_PortType, state string) {
+    var operStatus common.OperStatus_OperStatus
+    if state == "up" {
+        operStatus = voltha.OperStatus_ACTIVE
+    } else {
+        operStatus = voltha.OperStatus_DISCOVERED
+    }
+
+    // TODO
+    //portNum := platform.intfIdToPortNo(intfId, portType)
+    portNum := intfId
+
+    label := portName(portNum, portType, intfId)
+    //    Now create the PON Port
+    ponPort := &voltha.Port{
+        PortNo:     portNum,
+        Label:      label,
+        Type:       portType,
+        OperStatus: operStatus,
+    }
+
+    // Synchronous call to update device - this method is run in its own go routine
+    if err := dh.coreProxy.PortCreated(nil, dh.device.Id, ponPort); err != nil {
+        log.Errorw("error-creating-nni-port", log.Fields{"deviceId": dh.device.Id, "error": err})
+    }
+}
+
+// readIndications to read the indications from the OLT device
+func (dh *DeviceHandler) readIndications() {
+    indications, err := dh.client.EnableIndication(context.Background(), new(oop.Empty))
+    if err != nil {
+        log.Errorw("Failed to read indications", log.Fields{"err": err})
+        return
+    }
+    if indications == nil {
+        log.Errorw("Indications is nil", log.Fields{})
+        return
+    }
+    for {
+        indication, err := indications.Recv()
+        if err == io.EOF {
+            break
+        }
+        if err != nil {
+            log.Infow("Failed to read from indications", log.Fields{"err": err})
+            continue
+        }
+        switch indication.Data.(type) {
+        case *oop.Indication_OltInd:
+            oltInd := indication.GetOltInd()
+            if oltInd.OperState == "up" {
+                dh.transitionMap.Handle(DeviceUpInd)
+            } else if oltInd.OperState == "down" {
+                dh.transitionMap.Handle(DeviceDownInd)
+            }
+        case *oop.Indication_IntfInd:
+
+            intfInd := indication.GetIntfInd()
+            go dh.addPort(intfInd.GetIntfId(), voltha.Port_PON_OLT, intfInd.GetOperState())
+            log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
+        case *oop.Indication_IntfOperInd:
+            intfOperInd := indication.GetIntfOperInd()
+            if intfOperInd.GetType() == "nni" {
+                go dh.addPort(intfOperInd.GetIntfId(), voltha.Port_ETHERNET_NNI, intfOperInd.GetOperState())
+            } else if intfOperInd.GetType() == "pon" {
+                // TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
+                // Handle pon port update
+            }
+            log.Infow("Received interface oper indication ", log.Fields{"InterfaceOperInd": intfOperInd})
+        case *oop.Indication_OnuDiscInd:
+            onuDiscInd := indication.GetOnuDiscInd()
+            log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
+            // TODO Get onu ID from the resource manager
+            onuId := 0
+            go dh.coreProxy.ChildDeviceDetected(nil, dh.device.Id, int(onuDiscInd.GetIntfId()),
+                   "brcm_openomci_onu", int(onuDiscInd.GetIntfId()), string(onuDiscInd.SerialNumber.GetVendorId()), 
+                   string(onuDiscInd.SerialNumber.GetVendorSpecific()), int64(onuId))
+        case *oop.Indication_OnuInd:
+            onuInd := indication.GetOnuInd()
+            log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
+        case *oop.Indication_OmciInd:
+            omciInd := indication.GetOmciInd()
+            log.Infow("Received Omci indication ", log.Fields{"OmciInd": omciInd})
+        case *oop.Indication_PktInd:
+            pktInd := indication.GetPktInd()
+            log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
+        case *oop.Indication_PortStats:
+            portStats := indication.GetPortStats()
+            log.Infow("Received port stats indication", log.Fields{"PortStats": portStats})
+        case *oop.Indication_FlowStats:
+            flowStats := indication.GetFlowStats()
+            log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
+        case *oop.Indication_AlarmInd:
+            alarmInd := indication.GetAlarmInd()
+            log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
+        }
+    }
+}
+
+// doStateUp handle the olt up indication and update to voltha core
+func (dh *DeviceHandler) doStateUp() error {
+    // Synchronous call to update device state - this method is run in its own go routine
+    if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
+        voltha.OperStatus_ACTIVE); err != nil {
+        log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceId": dh.device.Id, "error": err})
+        return err
+    }
+    return nil
+}
+
+// doStateDown handle the olt down indication
+func (dh *DeviceHandler) doStateDown() error {
+    //TODO Handle oper state down
+    return nil
+}
+
+// doStateInit dial the grpc before going to init state
+func (dh *DeviceHandler) doStateInit() error {
+    var err error
+    dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure())
+    if err != nil {
+        log.Errorw("Failed to dial device", log.Fields{"DeviceId": dh.deviceId, "HostAndPort": dh.device.GetHostAndPort(), "err": err})
+        return err
+    }
+    return nil
+}
+
+// postInit create olt client instance to invoke RPC on the olt device
+func (dh *DeviceHandler) postInit() error {
+    dh.client = oop.NewOpenoltClient(dh.clientCon)
+    dh.transitionMap.Handle(GrpcConnected)
+    return nil
+}
+
+// doStateConnected get the device info and update to voltha core
+func (dh *DeviceHandler) doStateConnected() error {
+    deviceInfo, err := dh.client.GetDeviceInfo(context.Background(), new(oop.Empty))
+    if err != nil {
+        log.Errorw("Failed to fetch device info", log.Fields{"err": err})
+        return err
+    }
+    if deviceInfo == nil {
+        log.Errorw("Device info is nil", log.Fields{})
+        return errors.New("Failed to get device info from OLT")
+    }
+
+    dh.device.Root = true
+    dh.device.Vendor = deviceInfo.Vendor
+    dh.device.Model = deviceInfo.Model
+    dh.device.ConnectStatus = voltha.ConnectStatus_REACHABLE
+    dh.device.SerialNumber = deviceInfo.DeviceSerialNumber
+    dh.device.HardwareVersion = deviceInfo.HardwareVersion
+    dh.device.FirmwareVersion = deviceInfo.FirmwareVersion
+    // TODO : Check whether this MAC address is learnt from SDPON or need to send from device
+    dh.device.MacAddress = "0a:0b:0c:0d:0e:0f"
+
+    // Synchronous call to update device - this method is run in its own go routine
+    if err := dh.coreProxy.DeviceUpdate(nil, dh.device); err != nil {
+        log.Errorw("error-updating-device", log.Fields{"deviceId": dh.device.Id, "error": err})
+    }
+
+    // Start reading indications
+    go dh.readIndications()
+    return nil
+}
+
+// AdoptDevice adopts the OLT device
+func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
+    dh.transitionMap = NewTransitionMap(dh)
+    log.Infow("AdoptDevice", log.Fields{"deviceId": device.Id, "Address": device.GetHostAndPort()})
+    dh.transitionMap.Handle(DeviceInit)
+}
+
+// GetOfpDeviceInfo Get the Ofp device information
+func (dh *DeviceHandler) GetOfpDeviceInfo(device *voltha.Device) (*ic.SwitchCapability, error) {
+    return &ic.SwitchCapability{
+        Desc: &of.OfpDesc{
+            HwDesc:    "open_pon",
+            SwDesc:    "open_pon",
+            SerialNum: dh.device.SerialNumber,
+        },
+        SwitchFeatures: &of.OfpSwitchFeatures{
+            NBuffers: 256,
+            NTables:  2,
+            Capabilities: uint32(of.OfpCapabilities_OFPC_FLOW_STATS |
+                of.OfpCapabilities_OFPC_TABLE_STATS |
+                of.OfpCapabilities_OFPC_PORT_STATS |
+                of.OfpCapabilities_OFPC_GROUP_STATS),
+        },
+    }, nil
+}
+
+// GetOfpPortInfo Get Ofp port information
+func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device, portNo int64) (*ic.PortCapability, error) {
+    cap := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
+    return &ic.PortCapability{
+        Port: &voltha.LogicalPort{
+            OfpPort: &of.OfpPort{
+                HwAddr:     macAddressToUint32Array(dh.device.MacAddress),
+                Config:     0,
+                State:      uint32(of.OfpPortState_OFPPS_LIVE),
+                Curr:       cap,
+                Advertised: cap,
+                Peer:       cap,
+                CurrSpeed:  uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+                MaxSpeed:   uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+            },
+            DeviceId:     dh.device.Id,
+            DevicePortNo: uint32(portNo),
+        },
+    }, nil
+}
+
+// Process_inter_adapter_message process inter adater message
+func (dh *DeviceHandler) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+    // TODO
+    log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+    return nil
+}
+
diff --git a/adaptercore/olt_state_transitions.go b/adaptercore/olt_state_transitions.go
new file mode 100644
index 0000000..45e5f5e
--- /dev/null
+++ b/adaptercore/olt_state_transitions.go
@@ -0,0 +1,184 @@
+/*

+ * Copyright 2018-present Open Networking Foundation

+

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+

+ * http://www.apache.org/licenses/LICENSE-2.0

+

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package adaptercore

+

+import (

+    "reflect"

+    "runtime"

+

+    "github.com/opencord/voltha-go/common/log"

+)

+

+// DeviceState OLT Device state

+type DeviceState int

+

+const (

+    // deviceStateNull OLT is not instantiated

+    deviceStateNull DeviceState = iota

+    // deviceStateInit OLT is instantiated

+    deviceStateInit

+    // deviceStateConnected Grpc session established with OLT

+    deviceStateConnected

+    // deviceStateUp Admin state of OLT is UP

+    deviceStateUp

+    // deviceStateDown Admin state of OLT is down

+    deviceStateDown

+)

+

+// Trigger for changing the state

+type Trigger int

+

+const (

+    // DeviceInit Go to Device init state

+    DeviceInit Trigger = iota

+    // GrpcConnected Go to connected state

+    GrpcConnected

+    // DeviceUpInd Go to Device up state

+    DeviceUpInd

+    // DeviceDownInd Go to Device down state

+    DeviceDownInd

+    // GrpcDisconnected Go to Device init state

+    GrpcDisconnected

+)

+

+// TransitionHandler function type for handling transition

+type TransitionHandler func() error

+

+// Transition to store state machine

+type Transition struct {

+    previousState []DeviceState

+    currentState  DeviceState

+    before        []TransitionHandler

+    after         []TransitionHandler

+}

+

+// TransitionMap to store all the states and current device state

+type TransitionMap struct {

+    transitions        map[Trigger]Transition

+    currentDeviceState DeviceState

+}

+

+//    OpenoltDevice state machine:

+//

+//        null ----> init ------> connected -----> up -----> down

+//                   ^ ^             |             ^         | |

+//                   | |             |             |         | |

+//                   | +-------------+             +---------+ |

+//                   |                                         |

+//                   +-----------------------------------------+

+

+// NewTransitionMap create a new state machine with all the transitions

+func NewTransitionMap(dh *DeviceHandler) *TransitionMap {

+    var transitionMap TransitionMap

+    transitionMap.currentDeviceState = deviceStateNull

+    transitionMap.transitions = make(map[Trigger]Transition)

+    // In doInit establish the grpc session

+    transitionMap.transitions[DeviceInit] =

+        Transition{

+            previousState: []DeviceState{deviceStateNull},

+            currentState:  deviceStateInit,

+            before:        []TransitionHandler{dh.doStateInit},

+            after:         []TransitionHandler{dh.postInit}}

+    // If gRpc session fails, re-establish the grpc session

+    transitionMap.transitions[GrpcDisconnected] =

+        Transition{

+            previousState: []DeviceState{deviceStateConnected, deviceStateDown},

+            currentState:  deviceStateInit,

+            before:        []TransitionHandler{dh.doStateInit},

+            after:         []TransitionHandler{dh.postInit}}

+    // in doConnected, create logical device and read the indications

+    transitionMap.transitions[GrpcConnected] =

+        Transition{

+            previousState: []DeviceState{deviceStateInit},

+            currentState:  deviceStateConnected,

+            before:        []TransitionHandler{dh.doStateConnected}}

+

+    // Once the olt UP is indication received, then do state up

+    transitionMap.transitions[DeviceUpInd] =

+        Transition{

+            previousState: []DeviceState{deviceStateConnected, deviceStateDown},

+            currentState:  deviceStateUp,

+            before:        []TransitionHandler{dh.doStateUp}}

+    // If olt DOWN indication comes then do sate down

+    transitionMap.transitions[DeviceDownInd] =

+        Transition{

+            previousState: []DeviceState{deviceStateUp},

+            currentState:  deviceStateDown,

+            before:        []TransitionHandler{dh.doStateDown}}

+

+    return &transitionMap

+}

+

+// funcName gets the handler function name

+func funcName(f interface{}) string {

+    p := reflect.ValueOf(f).Pointer()

+    rf := runtime.FuncForPC(p)

+    return rf.Name()

+}

+

+// isValidTransition checks for the new state transition is valid from current state

+func (tMap *TransitionMap) isValidTransition(trigger Trigger) bool {

+    // Validate the state transition

+    for _, state := range tMap.transitions[trigger].previousState {

+        if tMap.currentDeviceState == state {

+            return true

+        }

+    }

+    return false

+}

+

+// Handle moves the state machine to next state based on the trigger and invokes the before and

+//         after handlers

+func (tMap *TransitionMap) Handle(trigger Trigger) {

+

+    // Check whether the transtion is valid from current state

+    if !tMap.isValidTransition(trigger) {

+        log.Errorw("Invalid transition triggered ", log.Fields{"CurrentState": tMap.currentDeviceState, "Trigger": trigger})

+        return

+    }

+

+    // Invoke the before handlers

+    beforeHandlers := tMap.transitions[trigger].before

+    if beforeHandlers != nil {

+        for _, handler := range beforeHandlers {

+            log.Debugw("running-before-handler", log.Fields{"handler": funcName(handler)})

+            if err := handler(); err != nil {

+                // TODO handle error

+                return

+            }

+        }

+    } else {

+        log.Debugw("No handlers for before", log.Fields{"trigger": trigger})

+    }

+

+    // Update the state

+    tMap.currentDeviceState = tMap.transitions[trigger].currentState

+    log.Debugw("Updated device state ", log.Fields{"CurrentDeviceState": tMap.currentDeviceState})

+

+    // Invoke the after handlers

+    afterHandlers := tMap.transitions[trigger].after

+    if afterHandlers != nil {

+        for _, handler := range afterHandlers {

+            log.Debugw("running-after-handler", log.Fields{"handler": funcName(handler)})

+            if err := handler(); err != nil {

+                // TODO handle error

+                return

+            }

+        }

+    } else {

+        log.Debugw("No handlers for after", log.Fields{"trigger": trigger})

+    }

+}

diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
new file mode 100644
index 0000000..18ef08f
--- /dev/null
+++ b/adaptercore/openolt.go
@@ -0,0 +1,245 @@
+/*

+ * Copyright 2018-present Open Networking Foundation

+

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+

+ * http://www.apache.org/licenses/LICENSE-2.0

+

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package adaptercore

+

+import (

+    "context"

+    "errors"

+    "fmt"

+    "sync"

+

+    com "github.com/opencord/voltha-go/adapters/common"

+    "github.com/opencord/voltha-go/common/log"

+    "github.com/opencord/voltha-go/kafka"

+    ic "github.com/opencord/voltha-go/protos/inter_container"

+    "github.com/opencord/voltha-go/protos/openflow_13"

+    "github.com/opencord/voltha-go/protos/voltha"

+)

+

+type OpenOLT struct {

+    deviceHandlers        map[string]*DeviceHandler

+    coreProxy             *com.CoreProxy

+    kafkaICProxy          *kafka.InterContainerProxy

+    numOnus               int

+    exitChannel           chan int

+    lockDeviceHandlersMap sync.RWMutex

+}

+

+func NewOpenOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy, onuNumber int) *OpenOLT {

+    var openOLT OpenOLT

+    openOLT.exitChannel = make(chan int, 1)

+    openOLT.deviceHandlers = make(map[string]*DeviceHandler)

+    openOLT.kafkaICProxy = kafkaICProxy

+    openOLT.numOnus = onuNumber

+    openOLT.coreProxy = coreProxy

+    openOLT.lockDeviceHandlersMap = sync.RWMutex{}

+    return &openOLT

+}

+

+func (oo *OpenOLT) Start(ctx context.Context) error {

+    log.Info("starting-device-manager")

+    log.Info("device-manager-started")

+    return nil

+}

+

+func (oo *OpenOLT) Stop(ctx context.Context) error {

+    log.Info("stopping-device-manager")

+    oo.exitChannel <- 1

+    log.Info("device-manager-stopped")

+    return nil

+}

+

+func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {

+    if ctx.Err() == nil {

+        // Returned response only of the ctx has not been cancelled/timeout/etc

+        // Channel is automatically closed when a context is Done

+        ch <- result

+        log.Debugw("sendResponse", log.Fields{"result": result})

+    } else {

+        // Should the transaction be reverted back?

+        log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})

+    }

+}

+

+func (oo *OpenOLT) addDeviceHandlerToMap(agent *DeviceHandler) {

+    oo.lockDeviceHandlersMap.Lock()

+    defer oo.lockDeviceHandlersMap.Unlock()

+    if _, exist := oo.deviceHandlers[agent.deviceId]; !exist {

+        oo.deviceHandlers[agent.deviceId] = agent

+    }

+}

+

+func (oo *OpenOLT) deleteDeviceHandlerToMap(agent *DeviceHandler) {

+    oo.lockDeviceHandlersMap.Lock()

+    defer oo.lockDeviceHandlersMap.Unlock()

+    delete(oo.deviceHandlers, agent.deviceId)

+}

+

+func (oo *OpenOLT) getDeviceHandler(deviceId string) *DeviceHandler {

+    oo.lockDeviceHandlersMap.Lock()

+    defer oo.lockDeviceHandlersMap.Unlock()

+    if agent, ok := oo.deviceHandlers[deviceId]; ok {

+        return agent

+    }

+    return nil

+}

+

+func (oo *OpenOLT) createDeviceTopic(device *voltha.Device) error {

+    log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})

+    deviceTopic := kafka.Topic{Name: oo.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}

+    // TODO for the offset

+    if err := oo.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, 0); err != nil {

+        log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})

+        return err

+    }

+    return nil

+}

+

+func (oo *OpenOLT) Adopt_device(device *voltha.Device) error {

+    if device == nil {

+        log.Warn("device-is-nil")

+        return errors.New("nil-device")

+    }

+    log.Infow("adopt-device", log.Fields{"deviceId": device.Id})

+    var handler *DeviceHandler

+    if handler = oo.getDeviceHandler(device.Id); handler == nil {

+        handler := NewDeviceHandler(oo.coreProxy, device, oo)

+        oo.addDeviceHandlerToMap(handler)

+        go handler.AdoptDevice(device)

+        // Launch the creation of the device topic

+        go oo.createDeviceTopic(device)

+    }

+    return nil

+}

+

+func (oo *OpenOLT) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {

+    log.Infow("Get_ofp_device_info", log.Fields{"deviceId": device.Id})

+    if handler := oo.getDeviceHandler(device.Id); handler != nil {

+        return handler.GetOfpDeviceInfo(device)

+    }

+    log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})

+    return nil, errors.New("device-handler-not-set")

+}

+

+func (oo *OpenOLT) Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error) {

+    log.Infow("Get_ofp_port_info", log.Fields{"deviceId": device.Id})

+    if handler := oo.getDeviceHandler(device.Id); handler != nil {

+        return handler.GetOfpPortInfo(device, port_no)

+    }

+    log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})

+    return nil, errors.New("device-handler-not-set")

+}

+

+func (oo *OpenOLT) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {

+    log.Infow("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})

+    targetDevice := msg.Header.ProxyDeviceId // Request?

+    if targetDevice == "" && msg.Header.ToDeviceId != "" {

+        // Typical response

+        targetDevice = msg.Header.ToDeviceId

+    }

+    if handler := oo.getDeviceHandler(targetDevice); handler != nil {

+        return handler.Process_inter_adapter_message(msg)

+    }

+    return errors.New(fmt.Sprintf("handler-not-found-%s", targetDevice))

+}

+

+func (oo *OpenOLT) Adapter_descriptor() error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Device_types() (*voltha.DeviceTypes, error) {

+    return nil, errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Health() (*voltha.HealthStatus, error) {

+    return nil, errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Reconcile_device(device *voltha.Device) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Abandon_device(device *voltha.Device) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Disable_device(device *voltha.Device) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Reenable_device(device *voltha.Device) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Reboot_device(device *voltha.Device) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Self_test_device(device *voltha.Device) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Gelete_device(device *voltha.Device) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Get_device_details(device *voltha.Device) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Suppress_alarm(filter *voltha.AlarmFilter) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Unsuppress_alarm(filter *voltha.AlarmFilter) error {

+    return errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Download_image(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {

+    return nil, errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {

+    return nil, errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {

+    return nil, errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {

+    return nil, errors.New("UnImplemented")

+}

+

+func (oo *OpenOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {

+    return nil, errors.New("UnImplemented")

+}

diff --git a/compose/adapters-openolt-go.yml b/compose/adapters-openolt-go.yml
new file mode 100644
index 0000000..a3475d9
--- /dev/null
+++ b/compose/adapters-openolt-go.yml
@@ -0,0 +1,42 @@
+---
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2'
+services:
+  adapter_openolt:
+    image: "${REGISTRY}${REPOSITORY}voltha-openolt-adapter-go${TAG}"
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "10m"
+        max-file: "3"
+    command: [
+      "/app/openolt",
+      "--kafka_adapter_host=${DOCKER_HOST_IP}",
+      "--kafka_adapter_port=9092",
+      "--kafka_cluster_host=${DOCKER_HOST_IP}",
+      "--kafka_cluster_port=9092",
+      "--core_topic=rwcore",
+      "--kv_store_host=${DOCKER_HOST_IP}",
+      "--kv_store_port=2379"
+    ]
+    ports:
+      - "50062:50062"
+    networks:
+    - default
+
+networks:
+  default:
+    driver: bridge
diff --git a/config/config.go b/config/config.go
new file mode 100644
index 0000000..4f30e65
--- /dev/null
+++ b/config/config.go
@@ -0,0 +1,143 @@
+/*

+ * Copyright 2018-present Open Networking Foundation

+

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+

+ * http://www.apache.org/licenses/LICENSE-2.0

+

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package config

+

+import (

+    "flag"

+    "fmt"

+    "github.com/opencord/voltha-go/common/log"

+    "os"

+)

+

+// Open OLT default constants

+const (

+    EtcdStoreName            = "etcd"

+    default_InstanceID       = "openOlt001"

+    default_KafkaAdapterHost = "127.0.0.1"

+    default_KafkaAdapterPort = 9092

+    default_KafkaClusterHost = "127.0.0.1"

+    default_KafkaClusterPort = 9094

+    default_KVStoreType      = EtcdStoreName

+    default_KVStoreTimeout   = 5 //in seconds

+    default_KVStoreHost      = "127.0.0.1"

+    default_KVStorePort      = 2379 // Consul = 8500; Etcd = 2379

+    default_LogLevel         = 0

+    default_Banner           = false

+    default_Topic            = "openolt"

+    default_CoreTopic        = "rwcore"

+    default_OnuNumber        = 1

+)

+

+// AdapterFlags represents the set of configurations used by the read-write adaptercore service

+type AdapterFlags struct {

+    // Command line parameters

+    InstanceID       string

+    KafkaAdapterHost string

+    KafkaAdapterPort int

+    KafkaClusterHost string

+    KafkaClusterPort int

+    KVStoreType      string

+    KVStoreTimeout   int // in seconds

+    KVStoreHost      string

+    KVStorePort      int

+    Topic            string

+    CoreTopic        string

+    LogLevel         int

+    OnuNumber        int

+    Banner           bool

+}

+

+func init() {

+    log.AddPackage(log.JSON, log.WarnLevel, nil)

+}

+

+// NewRWCoreFlags returns a new RWCore config

+func NewAdapterFlags() *AdapterFlags {

+    var adapterFlags = AdapterFlags{ // Default values

+        InstanceID:       default_InstanceID,

+        KafkaAdapterHost: default_KafkaAdapterHost,

+        KafkaAdapterPort: default_KafkaAdapterPort,

+        KafkaClusterHost: default_KafkaClusterHost,

+        KafkaClusterPort: default_KafkaClusterPort,

+        KVStoreType:      default_KVStoreType,

+        KVStoreTimeout:   default_KVStoreTimeout,

+        KVStoreHost:      default_KVStoreHost,

+        KVStorePort:      default_KVStorePort,

+        Topic:            default_Topic,

+        CoreTopic:        default_CoreTopic,

+        LogLevel:         default_LogLevel,

+        OnuNumber:        default_OnuNumber,

+        Banner:           default_Banner,

+    }

+    return &adapterFlags

+}

+

+// ParseCommandArguments parses the arguments when running read-write adaptercore service

+func (so *AdapterFlags) ParseCommandArguments() {

+

+    var help string

+

+    help = fmt.Sprintf("Kafka - Adapter messaging host")

+    flag.StringVar(&(so.KafkaAdapterHost), "kafka_adapter_host", default_KafkaAdapterHost, help)

+

+    help = fmt.Sprintf("Kafka - Adapter messaging port")

+    flag.IntVar(&(so.KafkaAdapterPort), "kafka_adapter_port", default_KafkaAdapterPort, help)

+

+    help = fmt.Sprintf("Kafka - Cluster messaging host")

+    flag.StringVar(&(so.KafkaClusterHost), "kafka_cluster_host", default_KafkaClusterHost, help)

+

+    help = fmt.Sprintf("Kafka - Cluster messaging port")

+    flag.IntVar(&(so.KafkaClusterPort), "kafka_cluster_port", default_KafkaClusterPort, help)

+

+    help = fmt.Sprintf("Open OLT topic")

+    flag.StringVar(&(so.Topic), "adapter_topic", default_Topic, help)

+

+    help = fmt.Sprintf("Core topic")

+    flag.StringVar(&(so.CoreTopic), "core_topic", default_CoreTopic, help)

+

+    help = fmt.Sprintf("KV store type")

+    flag.StringVar(&(so.KVStoreType), "kv_store_type", default_KVStoreType, help)

+

+    help = fmt.Sprintf("The default timeout when making a kv store request")

+    flag.IntVar(&(so.KVStoreTimeout), "kv_store_request_timeout", default_KVStoreTimeout, help)

+

+    help = fmt.Sprintf("KV store host")

+    flag.StringVar(&(so.KVStoreHost), "kv_store_host", default_KVStoreHost, help)

+

+    help = fmt.Sprintf("KV store port")

+    flag.IntVar(&(so.KVStorePort), "kv_store_port", default_KVStorePort, help)

+

+    help = fmt.Sprintf("Log level")

+    flag.IntVar(&(so.LogLevel), "log_level", default_LogLevel, help)

+

+    help = fmt.Sprintf("Number of ONUs")

+    flag.IntVar(&(so.OnuNumber), "onu_number", default_OnuNumber, help)

+

+    help = fmt.Sprintf("Show startup banner log lines")

+    flag.BoolVar(&so.Banner, "banner", default_Banner, help)

+

+    flag.Parse()

+

+    containerName := getContainerInfo()

+    if len(containerName) > 0 {

+        so.InstanceID = containerName

+    }

+

+}

+

+func getContainerInfo() string {

+    return os.Getenv("HOSTNAME")

+}

diff --git a/docker/Dockerfile.openolt b/docker/Dockerfile.openolt
new file mode 100644
index 0000000..c975591
--- /dev/null
+++ b/docker/Dockerfile.openolt
@@ -0,0 +1,58 @@
+# -------------
+# Build stage
+
+FROM golang:1.10.7-alpine AS build-env
+
+# Install required packages
+RUN apk add --no-cache wget git make build-base protobuf protobuf-dev
+
+# Install protobuf requirements
+RUN git clone https://github.com/googleapis/googleapis.git /usr/local/include/googleapis
+RUN go get google.golang.org/genproto/googleapis/api/annotations
+
+# Prepare directory structure
+RUN ["mkdir", "-p", "/src", "src/protos"]
+RUN ["mkdir", "-p", "$GOPATH/src", "$GOPATH/pkg", "$GOPATH/bin"]
+RUN ["mkdir", "-p", "$GOPATH/src/github.com/opencord"]
+RUN ["mkdir", "-p", "$GOPATH/src/github.com/opencord/voltha-go"]
+
+WORKDIR $GOPATH/src/github.com/opencord
+
+RUN git clone https://gerrit.opencord.org/voltha-go.git
+
+WORKDIR $GOPATH/src/github.com/opencord/voltha-go
+
+
+# Copy files
+ADD adaptercore ./adapters/openolt/adaptercore
+ADD config      ./adapters/openolt/config
+ADD *.go        ./adapters/openolt
+
+RUN ls ./adapters/openolt
+
+# Copy required proto files
+ADD openolt.proto ./protos
+
+# Install the protoc-gen-go
+RUN go install ./vendor/github.com/golang/protobuf/protoc-gen-go
+
+# Compile protobuf files
+RUN sh protos/scripts/build_protos.sh protos
+
+RUN protoc --go_out=Mgoogle/protobuf/descriptor.proto=github.com/golang/protobuf/protoc-gen-go/descriptor,plugins=grpc:$GOPATH/src -I protos -I /usr/local/include/googleapis  protos/openolt.proto
+
+# Build openolt
+
+RUN cd adapters/openolt && go build -o /src/openolt
+
+# -------------
+# Image creation stage
+
+FROM alpine:3.8
+
+# Set the working directory
+WORKDIR /app
+
+# Copy required files
+COPY --from=build-env /src/openolt /app/
+
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..12c5ff6
--- /dev/null
+++ b/main.go
@@ -0,0 +1,340 @@
+/*

+ * Copyright 2018-present Open Networking Foundation

+

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+

+ * http://www.apache.org/licenses/LICENSE-2.0

+

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package main

+

+import (

+    "context"

+    "errors"

+    "fmt"

+    "github.com/opencord/voltha-go/adapters"

+    com "github.com/opencord/voltha-go/adapters/common"

+    ac "github.com/opencord/voltha-go/adapters/openolt/adaptercore"

+    "github.com/opencord/voltha-go/adapters/openolt/config"

+    "github.com/opencord/voltha-go/common/log"

+    "github.com/opencord/voltha-go/db/kvstore"

+    "github.com/opencord/voltha-go/kafka"

+    ic "github.com/opencord/voltha-go/protos/inter_container"

+    "github.com/opencord/voltha-go/protos/voltha"

+    "os"

+    "os/signal"

+    "strconv"

+    "syscall"

+    "time"

+)

+

+type adapter struct {

+    instanceId       string

+    config           *config.AdapterFlags

+    iAdapter         adapters.IAdapter

+    kafkaClient      kafka.Client

+    kvClient         kvstore.Client

+    kip              *kafka.InterContainerProxy

+    coreProxy        *com.CoreProxy

+    halted           bool

+    exitChannel      chan int

+    receiverChannels []<-chan *ic.InterContainerMessage

+}

+

+func init() {

+    log.AddPackage(log.JSON, log.DebugLevel, nil)

+}

+

+func newAdapter(cf *config.AdapterFlags) *adapter {

+    var a adapter

+    a.instanceId = cf.InstanceID

+    a.config = cf

+    a.halted = false

+    a.exitChannel = make(chan int, 1)

+    a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)

+    return &a

+}

+

+func (a *adapter) start(ctx context.Context) {

+    log.Info("Starting Core Adapter components")

+    var err error

+

+    // Setup KV Client

+    log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})

+    if err := a.setKVClient(); err != nil {

+        log.Fatal("error-setting-kv-client")

+    }

+

+    // Setup Kafka Client

+    if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {

+        log.Fatal("Unsupported-common-client")

+    }

+

+    // Start the common InterContainer Proxy - retries indefinitely

+    if a.kip, err = a.startInterContainerProxy(-1); err != nil {

+        log.Fatal("error-starting-inter-container-proxy")

+    }

+

+    // Create the core proxy to handle requests to the Core

+    a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)

+

+    // Create the open OLT adapter

+    if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.config.OnuNumber); err != nil {

+        log.Fatal("error-starting-inter-container-proxy")

+    }

+

+    // Register the core request handler

+    if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {

+        log.Fatal("error-setting-core-request-handler")

+    }

+

+    //    Register this adapter to the Core - retries indefinitely

+    if err = a.registerWithCore(-1); err != nil {

+        log.Fatal("error-registering-with-core")

+    }

+}

+

+func (rw *adapter) stop() {

+    // Stop leadership tracking

+    rw.halted = true

+

+    // send exit signal

+    rw.exitChannel <- 0

+

+    // Cleanup - applies only if we had a kvClient

+    if rw.kvClient != nil {

+        // Release all reservations

+        if err := rw.kvClient.ReleaseAllReservations(); err != nil {

+            log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})

+        }

+        // Close the DB connection

+        rw.kvClient.Close()

+    }

+

+    // TODO:  More cleanup

+}

+

+func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {

+

+    log.Infow("kv-store-type", log.Fields{"store": storeType})

+    switch storeType {

+    case "consul":

+        return kvstore.NewConsulClient(address, timeout)

+    case "etcd":

+        return kvstore.NewEtcdClient(address, timeout)

+    }

+    return nil, errors.New("unsupported-kv-store")

+}

+

+func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {

+

+    log.Infow("common-client-type", log.Fields{"client": clientType})

+    switch clientType {

+    case "sarama":

+        return kafka.NewSaramaClient(

+            kafka.Host(host),

+            kafka.Port(port),

+            kafka.ProducerReturnOnErrors(true),

+            kafka.ProducerReturnOnSuccess(true),

+            kafka.ProducerMaxRetries(6),

+            kafka.ProducerRetryBackoff(time.Millisecond*30)), nil

+    }

+    return nil, errors.New("unsupported-client-type")

+}

+

+func (a *adapter) setKVClient() error {

+    addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)

+    client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)

+    if err != nil {

+        a.kvClient = nil

+        log.Error(err)

+        return err

+    }

+    a.kvClient = client

+    return nil

+}

+

+func toString(value interface{}) (string, error) {

+    switch t := value.(type) {

+    case []byte:

+        return string(value.([]byte)), nil

+    case string:

+        return value.(string), nil

+    default:

+        return "", fmt.Errorf("unexpected-type-%T", t)

+    }

+}

+

+func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {

+    log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,

+        "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})

+    var err error

+    var kip *kafka.InterContainerProxy

+    if kip, err = kafka.NewInterContainerProxy(

+        kafka.InterContainerHost(a.config.KafkaAdapterHost),

+        kafka.InterContainerPort(a.config.KafkaAdapterPort),

+        kafka.MsgClient(a.kafkaClient),

+        kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {

+        log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})

+        return nil, err

+    }

+    count := 0

+    for {

+        if err = kip.Start(); err != nil {

+            log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})

+            if retries == count {

+                return nil, err

+            }

+            count = +1

+            //    Take a nap before retrying

+            time.Sleep(2 * time.Second)

+        } else {

+            break

+        }

+    }

+

+    log.Info("common-messaging-proxy-created")

+    return kip, nil

+}

+

+func (a *adapter) startOpenOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, onuNumber int) (*ac.OpenOLT, error) {

+    log.Info("starting-open-olt")

+    var err error

+    sOLT := ac.NewOpenOLT(ctx, a.kip, cp, onuNumber)

+

+    if err = sOLT.Start(ctx); err != nil {

+        log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})

+        return nil, err

+    }

+

+    log.Info("open-olt-started")

+    return sOLT, nil

+}

+

+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {

+    log.Info("setting-request-handler")

+    requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, a.coreProxy)

+    if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {

+        log.Errorw("request-handler-setup-failed", log.Fields{"error": err})

+        return err

+

+    }

+    log.Info("request-handler-setup-done")

+    return nil

+}

+

+func (a *adapter) registerWithCore(retries int) error {

+    log.Info("registering-with-core")

+    adapterDescription := &voltha.Adapter{Id: "openolt", Vendor: "simulation Enterprise Inc"}

+    types := []*voltha.DeviceType{{Id: "openolt", Adapter: "openolt"}}

+    deviceTypes := &voltha.DeviceTypes{Items: types}

+    count := 0

+    for {

+        if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {

+            log.Warnw("registering-with-core-failed", log.Fields{"error": err})

+            if retries == count {

+                return err

+            }

+            count += 1

+            //    Take a nap before retrying

+            time.Sleep(2 * time.Second)

+        } else {

+            break

+        }

+    }

+    log.Info("registered-with-core")

+    return nil

+}

+

+func waitForExit() int {

+    signalChannel := make(chan os.Signal, 1)

+    signal.Notify(signalChannel,

+        syscall.SIGHUP,

+        syscall.SIGINT,

+        syscall.SIGTERM,

+        syscall.SIGQUIT)

+

+    exitChannel := make(chan int)

+

+    go func() {

+        s := <-signalChannel

+        switch s {

+        case syscall.SIGHUP,

+            syscall.SIGINT,

+            syscall.SIGTERM,

+            syscall.SIGQUIT:

+            log.Infow("closing-signal-received", log.Fields{"signal": s})

+            exitChannel <- 0

+        default:

+            log.Infow("unexpected-signal-received", log.Fields{"signal": s})

+            exitChannel <- 1

+        }

+    }()

+

+    code := <-exitChannel

+    return code

+}

+

+func printBanner() {

+    fmt.Println("   ____                     ____  _   _______ ")

+    fmt.Println("  / _ \\                   / __\\| | |__   __|")

+    fmt.Println(" | |  | |_ __   ___ _ __  | |  | | |    | |   ")

+    fmt.Println(" | |  | | '_\\ / _\\ '_\\ | |  | | |    | |   ")

+    fmt.Println(" | |__| | |_) |  __/ | | || |__| | |____| |   ")

+    fmt.Println(" \\____/| .__/\\___|_| |_|\\____/|______|_|   ")

+    fmt.Println("        | |                                   ")

+    fmt.Println("        |_|                                   ")

+    fmt.Println("                                              ")

+}

+

+func main() {

+    start := time.Now()

+

+    cf := config.NewAdapterFlags()

+    cf.ParseCommandArguments()

+

+    //// Setup logging

+

+    //Setup default logger - applies for packages that do not have specific logger set

+    if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {

+        log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")

+    }

+

+    // Update all loggers (provisionned via init) with a common field

+    if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {

+        log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")

+    }

+

+    log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel)

+

+    defer log.CleanUp()

+

+    // Print banner if specified

+    if cf.Banner {

+        printBanner()

+    }

+

+    log.Infow("config", log.Fields{"config": *cf})

+

+    ctx, cancel := context.WithCancel(context.Background())

+    defer cancel()

+

+    ad := newAdapter(cf)

+    go ad.start(ctx)

+

+    code := waitForExit()

+    log.Infow("received-a-closing-signal", log.Fields{"code": code})

+

+    // Cleanup before leaving

+    ad.stop()

+

+    elapsed := time.Since(start)

+    log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})

+}

diff --git a/openolt.proto b/openolt.proto
new file mode 100644
index 0000000..709105b
--- /dev/null
+++ b/openolt.proto
@@ -0,0 +1,560 @@
+// Copyright (c) 2018 Open Networking Foundation

+//

+// Licensed under the Apache License, Version 2.0 (the "License");

+// you may not use this file except in compliance with the License.

+// You may obtain a copy of the License at:

+//

+//     http://www.apache.org/licenses/LICENSE-2.0

+//

+// Unless required by applicable law or agreed to in writing, software

+// distributed under the License is distributed on an "AS IS" BASIS,

+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+// See the License for the specific language governing permissions and

+// limitations under the License.

+

+syntax = "proto3";

+

+option go_package = "github.com/opencord/voltha-go/protos/openolt";

+

+package openolt;

+

+import "google/api/annotations.proto";

+

+service Openolt {

+

+    rpc DisableOlt(Empty) returns (Empty) {

+        option (google.api.http) = {

+          post: "/v1/Disable"

+          body: "*"

+        };

+    }

+

+    rpc ReenableOlt(Empty) returns (Empty) {

+        option (google.api.http) = {

+          post: "/v1/Reenable"

+          body: "*"

+        };

+    }

+

+    rpc ActivateOnu(Onu) returns (Empty) {

+        option (google.api.http) = {

+          post: "/v1/EnableOnu"

+          body: "*"

+        };

+    }

+

+    rpc DeactivateOnu(Onu) returns (Empty) {

+        option (google.api.http) = {

+          post: "/v1/DisableOnu"

+          body: "*"

+        };

+    }

+

+    rpc DeleteOnu(Onu) returns (Empty) {

+        option (google.api.http) = {

+          post: "/v1/DeleteOnu"

+          body: "*"

+        };

+    }

+

+    rpc OmciMsgOut(OmciMsg) returns (Empty) {

+        option (google.api.http) = {

+          post: "/v1/OmciMsgOut"

+          body: "*"

+        };

+    }

+

+    rpc OnuPacketOut(OnuPacket) returns (Empty) {

+        option (google.api.http) = {

+          post: "/v1/OnuPacketOut"

+          body: "*"

+        };

+    }

+

+    rpc UplinkPacketOut(UplinkPacket) returns (Empty) {

+        option (google.api.http) = {

+          post: "/v1/UplinkPacketOut"

+          body: "*"

+        };

+    }

+

+    rpc FlowAdd(Flow) returns (Empty) {

+        option (google.api.http) = {

+          post: "/v1/FlowAdd"

+          body: "*"

+        };

+    }

+

+    rpc FlowRemove(Flow) returns (Empty) {

+        option (google.api.http) = {

+          post: "/v1/FlowRemove"

+          body: "*"

+        };

+    }

+

+    rpc HeartbeatCheck(Empty) returns (Heartbeat) {

+        option (google.api.http) = {

+          post: "/v1/HeartbeatCheck"

+          body: "*"

+        };

+    }

+

+    rpc EnablePonIf(Interface) returns (Empty) {

+        option (google.api.http) = {

+            post: "/v1/EnablePonIf"

+            body: "*"

+        };

+    }

+

+    rpc DisablePonIf(Interface) returns (Empty) {

+        option (google.api.http) = {

+            post: "/v1/DisablePonIf"

+            body: "*"

+        };

+    }

+

+    rpc GetDeviceInfo(Empty) returns (DeviceInfo) {

+        option (google.api.http) = {

+            post: "/v1/GetDeviceInfo"

+            body: "*"

+        };

+    }

+

+    rpc Reboot(Empty) returns (Empty) {

+         option (google.api.http) = {

+            post: "/v1/Reboot"

+            body: "*"

+        };

+    }

+

+    rpc CollectStatistics(Empty) returns (Empty) {

+        option (google.api.http) = {

+            post: "/v1/CollectStatistics"

+            body: "*"

+        };

+    }

+

+    rpc CreateTconts(Tconts) returns (Empty) {

+        option (google.api.http) = {

+            post: "/v1/CreateTconts"

+            body: "*"

+        };

+    }

+

+    rpc RemoveTconts(Tconts) returns (Empty) {

+        option (google.api.http) = {

+            post: "/v1/RemoveTconts"

+            body: "*"

+        };

+    }

+

+    rpc EnableIndication(Empty) returns (stream Indication) {}

+}

+

+message Indication {

+    oneof data {

+        OltIndication olt_ind = 1;

+        IntfIndication intf_ind = 2;

+        IntfOperIndication intf_oper_ind = 3;

+        OnuDiscIndication onu_disc_ind = 4;

+        OnuIndication onu_ind = 5;

+        OmciIndication omci_ind = 6;

+        PacketIndication pkt_ind = 7;

+        PortStatistics port_stats = 8;

+        FlowStatistics flow_stats = 9;

+        AlarmIndication alarm_ind= 10;

+    }

+}

+

+message AlarmIndication {

+    oneof data {

+        LosIndication los_ind = 1;

+        DyingGaspIndication dying_gasp_ind = 2;

+        OnuAlarmIndication onu_alarm_ind = 3;

+        OnuStartupFailureIndication onu_startup_fail_ind = 4;

+        OnuSignalDegradeIndication onu_signal_degrade_ind = 5;

+        OnuDriftOfWindowIndication onu_drift_of_window_ind = 6;

+        OnuLossOfOmciChannelIndication onu_loss_omci_ind = 7;

+        OnuSignalsFailureIndication onu_signals_fail_ind = 8;

+        OnuTransmissionInterferenceWarning onu_tiwi_ind = 9;

+        OnuActivationFailureIndication onu_activation_fail_ind = 10;

+        OnuProcessingErrorIndication onu_processing_error_ind = 11;

+    }

+}

+

+message OltIndication {

+    string oper_state = 1;	// up, down

+}

+

+message IntfIndication {

+    fixed32 intf_id = 1;

+    string oper_state = 2;      // up, down

+}

+

+message OnuDiscIndication {

+    fixed32 intf_id = 1;

+    SerialNumber serial_number = 2;

+}

+

+message OnuIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    string oper_state = 3;      // up, down

+    string admin_state = 5;     // up, down

+    SerialNumber serial_number = 4;

+}

+

+message IntfOperIndication {

+    string type = 1;		// nni, pon

+    fixed32 intf_id = 2;

+    string oper_state = 3;      // up, down

+}

+

+message OmciIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    bytes pkt = 3;

+}

+

+message PacketIndication {

+    string intf_type = 5;		// nni, pon, unknown

+    fixed32 intf_id = 1;

+    fixed32 gemport_id = 2;

+    fixed32 flow_id = 3;

+    fixed32 port_no = 6;

+    fixed64 cookie = 7;

+    bytes pkt = 4;

+}

+

+message Interface {

+    fixed32 intf_id = 1;

+}

+

+message Heartbeat {

+    fixed32 heartbeat_signature = 1;

+}

+

+message Onu {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    SerialNumber serial_number = 3;

+    fixed32 pir = 4;   // peak information rate assigned to onu

+}

+

+message OmciMsg {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    bytes pkt = 3;

+}

+

+message OnuPacket {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    fixed32 port_no = 4;

+    bytes pkt = 3;

+}

+

+message UplinkPacket {

+    fixed32 intf_id = 1;

+    bytes pkt = 2;

+}

+

+message DeviceInfo {

+    string vendor = 1;

+    string model = 2;

+    string hardware_version = 3;

+    string firmware_version = 4;

+    string device_id = 16;

+    string device_serial_number = 17;

+

+    // Total number of pon intf ports on the device

+    fixed32 pon_ports = 12;

+

+    // If using global per-device technology profile. To be deprecated

+    string technology = 5;

+    fixed32 onu_id_start = 6;

+    fixed32 onu_id_end = 7;

+    fixed32 alloc_id_start = 8;

+    fixed32 alloc_id_end = 9;

+    fixed32 gemport_id_start = 10;

+    fixed32 gemport_id_end = 11;

+    fixed32 flow_id_start = 13;

+    fixed32 flow_id_end = 14;

+

+    message DeviceResourceRanges {

+

+        // List of 0 or more intf_ids that use the same technology and pools.

+        // If 0 intf_ids supplied, it implies ALL interfaces

+        repeated fixed32 intf_ids = 1;

+

+        // Technology profile for this pool

+        string technology = 2;

+

+        message Pool {

+            enum PoolType {

+                ONU_ID = 0;

+                ALLOC_ID = 1;

+                GEMPORT_ID = 2;

+                FLOW_ID = 3;

+            }

+

+            enum SharingType {

+                DEDICATED_PER_INTF = 0;

+                SHARED_BY_ALL_INTF_ALL_TECH = 1; // Shared across all interfaces in all technologies in all ranges

+                SHARED_BY_ALL_INTF_SAME_TECH = 2; // Shared across all interfaces of the same technology used in this range

+            }

+

+            PoolType type = 1;

+	    SharingType sharing = 2;

+	    fixed32 start = 3; // lower bound on IDs allocated from this pool

+	    fixed32 end = 4; // upper bound on IDs allocated from this pool

+	}

+        repeated Pool pools = 3;

+    }

+    repeated DeviceResourceRanges ranges = 15;

+}

+

+message Classifier {

+    fixed32 o_tpid = 1;

+    fixed32 o_vid = 2;

+    fixed32 i_tpid = 3;

+    fixed32 i_vid = 4;

+    fixed32 o_pbits = 5;

+    fixed32 i_pbits = 6;

+    fixed32 eth_type = 7;

+    bytes dst_mac = 8;

+    bytes src_mac = 9;

+    fixed32 ip_proto = 10;

+    fixed32 dst_ip = 11;

+    fixed32 src_ip = 12;

+    fixed32 src_port = 13;

+    fixed32 dst_port = 14;

+    string pkt_tag_type = 15;	// untagged, single_tag, double_tag

+}

+

+message ActionCmd {

+    bool add_outer_tag = 1;

+    bool remove_outer_tag = 2;

+    bool trap_to_host = 3;

+}

+

+message Action {

+    ActionCmd cmd = 1;

+    fixed32 o_vid = 2;

+    fixed32 o_pbits = 3;

+    fixed32 o_tpid = 4;

+    fixed32 i_vid = 5;

+    fixed32 i_pbits = 6;

+    fixed32 i_tpid = 7;

+}

+

+message Flow {

+    sfixed32 access_intf_id = 1;

+    sfixed32 onu_id = 2;

+    sfixed32 uni_id = 11;

+    fixed32 flow_id = 3;

+    string flow_type = 4;	// upstream, downstream, broadcast, multicast

+    sfixed32 alloc_id = 10;

+    sfixed32 network_intf_id = 5;

+    sfixed32 gemport_id = 6;

+    Classifier classifier = 7;

+    Action action = 8;

+    sfixed32 priority = 9;

+    fixed64 cookie = 12; // must be provided for any flow with trap_to_host action. Returned in PacketIndication

+    fixed32 port_no = 13; // must be provided for any flow with trap_to_host action. Returned in PacketIndication

+}

+

+message SerialNumber {

+    bytes vendor_id = 1;

+    bytes vendor_specific = 2;

+}

+

+message PortStatistics {

+    fixed32 intf_id = 1;

+    fixed64 rx_bytes = 2;

+    fixed64 rx_packets = 3;

+    fixed64 rx_ucast_packets = 4;

+    fixed64 rx_mcast_packets = 5;

+    fixed64 rx_bcast_packets = 6;

+    fixed64 rx_error_packets = 7;

+    fixed64 tx_bytes = 8;

+    fixed64 tx_packets = 9;

+    fixed64 tx_ucast_packets = 10;

+    fixed64 tx_mcast_packets = 11;

+    fixed64 tx_bcast_packets = 12;

+    fixed64 tx_error_packets = 13;

+    fixed64 rx_crc_errors = 14;

+    fixed64 bip_errors = 15;

+    fixed32 timestamp = 16;

+}

+

+message FlowStatistics {

+    fixed32 flow_id = 1;

+    fixed64 rx_bytes = 2;

+    fixed64 rx_packets = 3;

+    fixed64 tx_bytes = 8;

+    fixed64 tx_packets = 9;

+    fixed32 timestamp = 16;

+}

+

+message LosIndication {

+    fixed32 intf_id = 1;

+    string status = 2;

+}

+

+message DyingGaspIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    string status = 3;

+}

+

+message OnuAlarmIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    string los_status = 3;

+    string lob_status = 4;

+    string lopc_miss_status = 5;

+    string lopc_mic_error_status = 6;

+}

+

+message OnuStartupFailureIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    string status = 3;

+}

+

+message OnuSignalDegradeIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    string status = 3;

+    fixed32 inverse_bit_error_rate = 4;

+}

+

+message OnuDriftOfWindowIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    string status = 3;

+    fixed32 drift = 4;

+    fixed32 new_eqd = 5;

+}

+

+message OnuLossOfOmciChannelIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    string status = 3;

+}

+

+message OnuSignalsFailureIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    string status = 3;

+    fixed32 inverse_bit_error_rate = 4;

+}

+

+message OnuTransmissionInterferenceWarning {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    string status = 3;

+    fixed32 drift = 4;

+}

+

+message OnuActivationFailureIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+}

+

+message OnuProcessingErrorIndication {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+}

+

+enum Direction {

+    UPSTREAM = 0;

+    DOWNSTREAM = 1;

+    BIDIRECTIONAL = 2;

+}

+

+enum SchedulingPolicy {

+    WRR = 0;

+    StrictPriority = 1;

+    Hybrid = 2;

+}

+

+enum AdditionalBW {

+    AdditionalBW_None = 0;

+    AdditionalBW_NA = 1;

+    AdditionalBW_BestEffort = 2;

+    AdditionalBW_Auto = 3;

+}

+

+enum DiscardPolicy {

+    TailDrop = 0;

+    WTailDrop = 1;

+    Red = 2;

+    WRed = 3;

+}

+

+enum InferredAdditionBWIndication {

+    InferredAdditionBWIndication_None = 0;

+    InferredAdditionBWIndication_Assured = 1;

+    InferredAdditionBWIndication_BestEffort = 2;

+}

+

+message Scheduler {

+    Direction direction = 1;

+    AdditionalBW additional_bw = 2; // Valid on for “direction == Upstream”.

+    fixed32 priority = 3;

+    fixed32 weight = 4;

+    SchedulingPolicy sched_policy = 5;

+}

+

+message TrafficShapingInfo {

+    fixed32 cir = 1;

+    fixed32 cbs = 2;

+    fixed32 pir = 3;

+    fixed32 pbs = 4;

+    fixed32 gir = 5; // only if “direction == Upstream ”

+    InferredAdditionBWIndication add_bw_ind = 6; // only if “direction == Upstream”

+}

+

+message Tcont {

+    Direction direction = 1;

+    fixed32 alloc_id = 2; // valid only if “direction == Upstream ”

+    Scheduler scheduler = 3;

+    TrafficShapingInfo traffic_shaping_info = 4;

+}

+

+message Tconts {

+    fixed32 intf_id = 1;

+    fixed32 onu_id = 2;

+    fixed32 uni_id = 4;

+    fixed32 port_no = 5;

+    repeated Tcont tconts = 3;

+}

+

+message TailDropDiscardConfig {

+    fixed32 queue_size = 1;

+}

+

+message RedDiscardConfig {

+    fixed32 min_threshold = 1;

+    fixed32 max_threshold = 2;

+    fixed32 max_probability = 3;

+}

+

+message WRedDiscardConfig {

+    RedDiscardConfig green = 1;

+    RedDiscardConfig yellow = 2;

+    RedDiscardConfig red = 3;

+}

+

+message DiscardConfig {

+    DiscardPolicy discard_policy = 1;

+    oneof discard_config {

+        TailDropDiscardConfig tail_drop_discard_config = 2;

+        RedDiscardConfig red_discard_config = 3;

+        WRedDiscardConfig wred_discard_config = 4;

+    }

+}

+

+message Empty {}