VOL-1510 inter adapter communication implementation in openolt adapter VOLTHA2.X

Change-Id: Icfd6dc65ee326aa01b38849d745d73ae8b378337
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 07ae169..e8d618c 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -16,317 +16,517 @@
 package adaptercore
 
 import (
-    "context"
-    "errors"
-    "io"
-    "strconv"
-    "strings"
-    "sync"
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
 
-    "github.com/gogo/protobuf/proto"
-    com "github.com/opencord/voltha-go/adapters/common"
-    "github.com/opencord/voltha-go/common/log"
-    "github.com/opencord/voltha-go/protos/common"
-    ic "github.com/opencord/voltha-go/protos/inter_container"
-    of "github.com/opencord/voltha-go/protos/openflow_13"
-    oop "github.com/opencord/voltha-go/protos/openolt"
-    "github.com/opencord/voltha-go/protos/voltha"
-    "google.golang.org/grpc"
+	"github.com/gogo/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	com "github.com/opencord/voltha-go/adapters/common"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/protos/common"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	of "github.com/opencord/voltha-go/protos/openflow_13"
+	oop "github.com/opencord/voltha-go/protos/openolt"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"google.golang.org/grpc"
 )
 
 //DeviceHandler will interact with the OLT device.
 type DeviceHandler struct {
-    deviceId      string
-    deviceType    string
-    device        *voltha.Device
-    coreProxy     *com.CoreProxy
-    openOLT       *OpenOLT
-    nniPort       *voltha.Port
-    ponPort       *voltha.Port
-    exitChannel   chan int
-    lockDevice    sync.RWMutex
-    client        oop.OpenoltClient
-    transitionMap *TransitionMap
-    clientCon     *grpc.ClientConn
+	deviceId      string
+	deviceType    string
+	device        *voltha.Device
+	coreProxy     *com.CoreProxy
+	adapterProxy  *com.AdapterProxy
+	openOLT       *OpenOLT
+	nniPort       *voltha.Port
+	ponPort       *voltha.Port
+	exitChannel   chan int
+	lockDevice    sync.RWMutex
+	client        oop.OpenoltClient
+	transitionMap *TransitionMap
+	clientCon     *grpc.ClientConn
 }
 
 //NewDeviceHandler creates a new device handler
-func NewDeviceHandler(cp *com.CoreProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
-    var dh DeviceHandler
-    dh.coreProxy = cp
-    cloned := (proto.Clone(device)).(*voltha.Device)
-    dh.deviceId = cloned.Id
-    dh.deviceType = cloned.Type
-    dh.device = cloned
-    dh.openOLT = adapter
-    dh.exitChannel = make(chan int, 1)
-    dh.lockDevice = sync.RWMutex{}
+func NewDeviceHandler(cp *com.CoreProxy, ap *com.AdapterProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
+	var dh DeviceHandler
+	dh.coreProxy = cp
+	dh.adapterProxy = ap
+	cloned := (proto.Clone(device)).(*voltha.Device)
+	dh.deviceId = cloned.Id
+	dh.deviceType = cloned.Type
+	dh.device = cloned
+	dh.openOLT = adapter
+	dh.exitChannel = make(chan int, 1)
+	dh.lockDevice = sync.RWMutex{}
 
-    //TODO initialize the support classes.
-    return &dh
+	//TODO initialize the support classes.
+	return &dh
 }
 
 // start save the device to the data model
 func (dh *DeviceHandler) start(ctx context.Context) {
-    dh.lockDevice.Lock()
-    defer dh.lockDevice.Unlock()
-    log.Debugw("starting-device-agent", log.Fields{"device": dh.device})
-    // Add the initial device to the local model
-    log.Debug("device-agent-started")
+	dh.lockDevice.Lock()
+	defer dh.lockDevice.Unlock()
+	log.Debugw("starting-device-agent", log.Fields{"device": dh.device})
+	// Add the initial device to the local model
+	log.Debug("device-agent-started")
 }
 
 // stop stops the device dh.  Not much to do for now
 func (dh *DeviceHandler) stop(ctx context.Context) {
-    dh.lockDevice.Lock()
-    defer dh.lockDevice.Unlock()
-    log.Debug("stopping-device-agent")
-    dh.exitChannel <- 1
-    log.Debug("device-agent-stopped")
+	dh.lockDevice.Lock()
+	defer dh.lockDevice.Unlock()
+	log.Debug("stopping-device-agent")
+	dh.exitChannel <- 1
+	log.Debug("device-agent-stopped")
 }
 
 func macAddressToUint32Array(mac string) []uint32 {
-    slist := strings.Split(mac, ":")
-    result := make([]uint32, len(slist))
-    var err error
-    var tmp int64
-    for index, val := range slist {
-        if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
-            return []uint32{1, 2, 3, 4, 5, 6}
-        }
-        result[index] = uint32(tmp)
-    }
-    return result
+	slist := strings.Split(mac, ":")
+	result := make([]uint32, len(slist))
+	var err error
+	var tmp int64
+	for index, val := range slist {
+		if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
+			return []uint32{1, 2, 3, 4, 5, 6}
+		}
+		result[index] = uint32(tmp)
+	}
+	return result
 }
 
 func portName(portNum uint32, portType voltha.Port_PortType, intfId uint32) string {
 
-    if portType == voltha.Port_PON_OLT {
-        return "pon-" + string(portNum)
-    } else if portType == voltha.Port_ETHERNET_NNI {
-        return "nni-" + string(intfId)
-    } else if portType == voltha.Port_ETHERNET_UNI {
-        log.Errorw("local UNI management not supported", log.Fields{})
-    }
-    return ""
+	if portType == voltha.Port_PON_OLT {
+		//return "pon-" + string(portNum)
+		return "pon-" + strconv.FormatInt(int64(intfId), 10)
+	} else if portType == voltha.Port_ETHERNET_NNI {
+		//return "nni-" + string(intfId)
+		return "nni-" + strconv.FormatInt(int64(intfId), 10)
+	} else if portType == voltha.Port_ETHERNET_UNI {
+		log.Errorw("local UNI management not supported", log.Fields{})
+	}
+	return ""
 }
 
 func (dh *DeviceHandler) addPort(intfId uint32, portType voltha.Port_PortType, state string) {
-    var operStatus common.OperStatus_OperStatus
-    if state == "up" {
-        operStatus = voltha.OperStatus_ACTIVE
-    } else {
-        operStatus = voltha.OperStatus_DISCOVERED
-    }
+	var operStatus common.OperStatus_OperStatus
+	if state == "up" {
+		operStatus = voltha.OperStatus_ACTIVE
+	} else {
+		operStatus = voltha.OperStatus_DISCOVERED
+	}
 
-    // TODO
-    //portNum := platform.intfIdToPortNo(intfId, portType)
-    portNum := intfId
+	// TODO
+	//portNum := platform.intfIdToPortNo(intfId, portType)
+	//portNum := intfIdToPortNo(intfId, portType)
+	portNum := intfId
 
-    label := portName(portNum, portType, intfId)
-    //    Now create the PON Port
-    ponPort := &voltha.Port{
-        PortNo:     portNum,
-        Label:      label,
-        Type:       portType,
-        OperStatus: operStatus,
-    }
+	label := portName(portNum, portType, intfId)
+	//    Now create the PON Port
+	ponPort := &voltha.Port{
+		PortNo:     portNum,
+		Label:      label,
+		Type:       portType,
+		OperStatus: operStatus,
+	}
 
-    // Synchronous call to update device - this method is run in its own go routine
-    if err := dh.coreProxy.PortCreated(nil, dh.device.Id, ponPort); err != nil {
-        log.Errorw("error-creating-nni-port", log.Fields{"deviceId": dh.device.Id, "error": err})
-    }
+	// Synchronous call to update device - this method is run in its own go routine
+	if err := dh.coreProxy.PortCreated(nil, dh.device.Id, ponPort); err != nil {
+		log.Errorw("error-creating-nni-port", log.Fields{"deviceId": dh.device.Id, "error": err})
+	}
 }
 
 // readIndications to read the indications from the OLT device
 func (dh *DeviceHandler) readIndications() {
-    indications, err := dh.client.EnableIndication(context.Background(), new(oop.Empty))
-    if err != nil {
-        log.Errorw("Failed to read indications", log.Fields{"err": err})
-        return
-    }
-    if indications == nil {
-        log.Errorw("Indications is nil", log.Fields{})
-        return
-    }
-    for {
-        indication, err := indications.Recv()
-        if err == io.EOF {
-            break
-        }
-        if err != nil {
-            log.Infow("Failed to read from indications", log.Fields{"err": err})
-            continue
-        }
-        switch indication.Data.(type) {
-        case *oop.Indication_OltInd:
-            oltInd := indication.GetOltInd()
-            if oltInd.OperState == "up" {
-                dh.transitionMap.Handle(DeviceUpInd)
-            } else if oltInd.OperState == "down" {
-                dh.transitionMap.Handle(DeviceDownInd)
-            }
-        case *oop.Indication_IntfInd:
+	indications, err := dh.client.EnableIndication(context.Background(), new(oop.Empty))
+	if err != nil {
+		log.Errorw("Failed to read indications", log.Fields{"err": err})
+		return
+	}
+	if indications == nil {
+		log.Errorw("Indications is nil", log.Fields{})
+		return
+	}
+	for {
+		indication, err := indications.Recv()
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			log.Infow("Failed to read from indications", log.Fields{"err": err})
+			continue
+		}
+		switch indication.Data.(type) {
+		case *oop.Indication_OltInd:
+			oltInd := indication.GetOltInd()
+			if oltInd.OperState == "up" {
+				dh.transitionMap.Handle(DeviceUpInd)
+			} else if oltInd.OperState == "down" {
+				dh.transitionMap.Handle(DeviceDownInd)
+			}
+		case *oop.Indication_IntfInd:
 
-            intfInd := indication.GetIntfInd()
-            go dh.addPort(intfInd.GetIntfId(), voltha.Port_PON_OLT, intfInd.GetOperState())
-            log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
-        case *oop.Indication_IntfOperInd:
-            intfOperInd := indication.GetIntfOperInd()
-            if intfOperInd.GetType() == "nni" {
-                go dh.addPort(intfOperInd.GetIntfId(), voltha.Port_ETHERNET_NNI, intfOperInd.GetOperState())
-            } else if intfOperInd.GetType() == "pon" {
-                // TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
-                // Handle pon port update
-            }
-            log.Infow("Received interface oper indication ", log.Fields{"InterfaceOperInd": intfOperInd})
-        case *oop.Indication_OnuDiscInd:
-            onuDiscInd := indication.GetOnuDiscInd()
-            log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
-            // TODO Get onu ID from the resource manager
-            onuId := 0
-            go dh.coreProxy.ChildDeviceDetected(nil, dh.device.Id, int(onuDiscInd.GetIntfId()),
-                   "brcm_openomci_onu", int(onuDiscInd.GetIntfId()), string(onuDiscInd.SerialNumber.GetVendorId()), 
-                   string(onuDiscInd.SerialNumber.GetVendorSpecific()), int64(onuId))
-        case *oop.Indication_OnuInd:
-            onuInd := indication.GetOnuInd()
-            log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
-        case *oop.Indication_OmciInd:
-            omciInd := indication.GetOmciInd()
-            log.Infow("Received Omci indication ", log.Fields{"OmciInd": omciInd})
-        case *oop.Indication_PktInd:
-            pktInd := indication.GetPktInd()
-            log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
-        case *oop.Indication_PortStats:
-            portStats := indication.GetPortStats()
-            log.Infow("Received port stats indication", log.Fields{"PortStats": portStats})
-        case *oop.Indication_FlowStats:
-            flowStats := indication.GetFlowStats()
-            log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
-        case *oop.Indication_AlarmInd:
-            alarmInd := indication.GetAlarmInd()
-            log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
-        }
-    }
+			intfInd := indication.GetIntfInd()
+			go dh.addPort(intfInd.GetIntfId(), voltha.Port_PON_OLT, intfInd.GetOperState())
+			log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
+		case *oop.Indication_IntfOperInd:
+			intfOperInd := indication.GetIntfOperInd()
+			if intfOperInd.GetType() == "nni" {
+				go dh.addPort(intfOperInd.GetIntfId(), voltha.Port_ETHERNET_NNI, intfOperInd.GetOperState())
+			} else if intfOperInd.GetType() == "pon" {
+				// TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
+				// Handle pon port update
+			}
+			log.Infow("Received interface oper indication ", log.Fields{"InterfaceOperInd": intfOperInd})
+		case *oop.Indication_OnuDiscInd:
+			onuDiscInd := indication.GetOnuDiscInd()
+			log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
+			// TODO Get onu ID from the resource manager
+			var onuId uint32 = 1
+			sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
+			go dh.onuDiscIndication(onuDiscInd, onuId, sn)
+		case *oop.Indication_OnuInd:
+			onuInd := indication.GetOnuInd()
+			log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
+			go dh.onuIndication(onuInd)
+		case *oop.Indication_OmciInd:
+			omciInd := indication.GetOmciInd()
+			log.Infow("Received Omci indication ", log.Fields{"OmciInd": omciInd})
+			if err := dh.omciIndication(omciInd); err != nil {
+				log.Errorw("send-omci-indication-errr", log.Fields{"error": err, "omciInd": omciInd})
+			}
+		case *oop.Indication_PktInd:
+			pktInd := indication.GetPktInd()
+			log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
+		case *oop.Indication_PortStats:
+			portStats := indication.GetPortStats()
+			log.Infow("Received port stats indication", log.Fields{"PortStats": portStats})
+		case *oop.Indication_FlowStats:
+			flowStats := indication.GetFlowStats()
+			log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
+		case *oop.Indication_AlarmInd:
+			alarmInd := indication.GetAlarmInd()
+			log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
+		}
+	}
 }
 
 // doStateUp handle the olt up indication and update to voltha core
 func (dh *DeviceHandler) doStateUp() error {
-    // Synchronous call to update device state - this method is run in its own go routine
-    if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
-        voltha.OperStatus_ACTIVE); err != nil {
-        log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceId": dh.device.Id, "error": err})
-        return err
-    }
-    return nil
+	// Synchronous call to update device state - this method is run in its own go routine
+	if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
+		voltha.OperStatus_ACTIVE); err != nil {
+		log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceId": dh.device.Id, "error": err})
+		return err
+	}
+	return nil
 }
 
 // doStateDown handle the olt down indication
 func (dh *DeviceHandler) doStateDown() error {
-    //TODO Handle oper state down
-    return nil
+	//TODO Handle oper state down
+	return nil
 }
 
 // doStateInit dial the grpc before going to init state
 func (dh *DeviceHandler) doStateInit() error {
-    var err error
-    dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure())
-    if err != nil {
-        log.Errorw("Failed to dial device", log.Fields{"DeviceId": dh.deviceId, "HostAndPort": dh.device.GetHostAndPort(), "err": err})
-        return err
-    }
-    return nil
+	var err error
+	dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure())
+	if err != nil {
+		log.Errorw("Failed to dial device", log.Fields{"DeviceId": dh.deviceId, "HostAndPort": dh.device.GetHostAndPort(), "err": err})
+		return err
+	}
+	return nil
 }
 
 // postInit create olt client instance to invoke RPC on the olt device
 func (dh *DeviceHandler) postInit() error {
-    dh.client = oop.NewOpenoltClient(dh.clientCon)
-    dh.transitionMap.Handle(GrpcConnected)
-    return nil
+	dh.client = oop.NewOpenoltClient(dh.clientCon)
+	dh.transitionMap.Handle(GrpcConnected)
+	return nil
 }
 
 // doStateConnected get the device info and update to voltha core
 func (dh *DeviceHandler) doStateConnected() error {
-    deviceInfo, err := dh.client.GetDeviceInfo(context.Background(), new(oop.Empty))
-    if err != nil {
-        log.Errorw("Failed to fetch device info", log.Fields{"err": err})
-        return err
-    }
-    if deviceInfo == nil {
-        log.Errorw("Device info is nil", log.Fields{})
-        return errors.New("Failed to get device info from OLT")
-    }
+	deviceInfo, err := dh.client.GetDeviceInfo(context.Background(), new(oop.Empty))
+	if err != nil {
+		log.Errorw("Failed to fetch device info", log.Fields{"err": err})
+		return err
+	}
+	if deviceInfo == nil {
+		log.Errorw("Device info is nil", log.Fields{})
+		return errors.New("Failed to get device info from OLT")
+	}
 
-    dh.device.Root = true
-    dh.device.Vendor = deviceInfo.Vendor
-    dh.device.Model = deviceInfo.Model
-    dh.device.ConnectStatus = voltha.ConnectStatus_REACHABLE
-    dh.device.SerialNumber = deviceInfo.DeviceSerialNumber
-    dh.device.HardwareVersion = deviceInfo.HardwareVersion
-    dh.device.FirmwareVersion = deviceInfo.FirmwareVersion
-    // TODO : Check whether this MAC address is learnt from SDPON or need to send from device
-    dh.device.MacAddress = "0a:0b:0c:0d:0e:0f"
+	dh.device.Root = true
+	dh.device.Vendor = deviceInfo.Vendor
+	dh.device.Model = deviceInfo.Model
+	dh.device.ConnectStatus = voltha.ConnectStatus_REACHABLE
+	dh.device.SerialNumber = deviceInfo.DeviceSerialNumber
+	dh.device.HardwareVersion = deviceInfo.HardwareVersion
+	dh.device.FirmwareVersion = deviceInfo.FirmwareVersion
+	// TODO : Check whether this MAC address is learnt from SDPON or need to send from device
+	dh.device.MacAddress = "0a:0b:0c:0d:0e:0f"
 
-    // Synchronous call to update device - this method is run in its own go routine
-    if err := dh.coreProxy.DeviceUpdate(nil, dh.device); err != nil {
-        log.Errorw("error-updating-device", log.Fields{"deviceId": dh.device.Id, "error": err})
-    }
+	// Synchronous call to update device - this method is run in its own go routine
+	if err := dh.coreProxy.DeviceUpdate(nil, dh.device); err != nil {
+		log.Errorw("error-updating-device", log.Fields{"deviceId": dh.device.Id, "error": err})
+	}
 
-    // Start reading indications
-    go dh.readIndications()
-    return nil
+	// Start reading indications
+	go dh.readIndications()
+	return nil
 }
 
 // AdoptDevice adopts the OLT device
 func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
-    dh.transitionMap = NewTransitionMap(dh)
-    log.Infow("AdoptDevice", log.Fields{"deviceId": device.Id, "Address": device.GetHostAndPort()})
-    dh.transitionMap.Handle(DeviceInit)
+	dh.transitionMap = NewTransitionMap(dh)
+	log.Infow("AdoptDevice", log.Fields{"deviceId": device.Id, "Address": device.GetHostAndPort()})
+	dh.transitionMap.Handle(DeviceInit)
 }
 
 // GetOfpDeviceInfo Get the Ofp device information
 func (dh *DeviceHandler) GetOfpDeviceInfo(device *voltha.Device) (*ic.SwitchCapability, error) {
-    return &ic.SwitchCapability{
-        Desc: &of.OfpDesc{
-            HwDesc:    "open_pon",
-            SwDesc:    "open_pon",
-            SerialNum: dh.device.SerialNumber,
-        },
-        SwitchFeatures: &of.OfpSwitchFeatures{
-            NBuffers: 256,
-            NTables:  2,
-            Capabilities: uint32(of.OfpCapabilities_OFPC_FLOW_STATS |
-                of.OfpCapabilities_OFPC_TABLE_STATS |
-                of.OfpCapabilities_OFPC_PORT_STATS |
-                of.OfpCapabilities_OFPC_GROUP_STATS),
-        },
-    }, nil
+	return &ic.SwitchCapability{
+		Desc: &of.OfpDesc{
+			HwDesc:    "open_pon",
+			SwDesc:    "open_pon",
+			SerialNum: dh.device.SerialNumber,
+		},
+		SwitchFeatures: &of.OfpSwitchFeatures{
+			NBuffers: 256,
+			NTables:  2,
+			Capabilities: uint32(of.OfpCapabilities_OFPC_FLOW_STATS |
+				of.OfpCapabilities_OFPC_TABLE_STATS |
+				of.OfpCapabilities_OFPC_PORT_STATS |
+				of.OfpCapabilities_OFPC_GROUP_STATS),
+		},
+	}, nil
 }
 
 // GetOfpPortInfo Get Ofp port information
 func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device, portNo int64) (*ic.PortCapability, error) {
-    cap := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
-    return &ic.PortCapability{
-        Port: &voltha.LogicalPort{
-            OfpPort: &of.OfpPort{
-                HwAddr:     macAddressToUint32Array(dh.device.MacAddress),
-                Config:     0,
-                State:      uint32(of.OfpPortState_OFPPS_LIVE),
-                Curr:       cap,
-                Advertised: cap,
-                Peer:       cap,
-                CurrSpeed:  uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
-                MaxSpeed:   uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
-            },
-            DeviceId:     dh.device.Id,
-            DevicePortNo: uint32(portNo),
-        },
-    }, nil
+	cap := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
+	return &ic.PortCapability{
+		Port: &voltha.LogicalPort{
+			OfpPort: &of.OfpPort{
+				HwAddr:     macAddressToUint32Array(dh.device.MacAddress),
+				Config:     0,
+				State:      uint32(of.OfpPortState_OFPPS_LIVE),
+				Curr:       cap,
+				Advertised: cap,
+				Peer:       cap,
+				CurrSpeed:  uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+				MaxSpeed:   uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+			},
+			DeviceId:     dh.device.Id,
+			DevicePortNo: uint32(portNo),
+		},
+	}, nil
+}
+
+func (dh *DeviceHandler) omciIndication(omciInd *oop.OmciIndication) error {
+	log.Debugw("omci indication", log.Fields{"intfId": omciInd.IntfId, "onuId": omciInd.OnuId})
+
+	kwargs := make(map[string]interface{})
+	kwargs["onu_id"] = omciInd.OnuId
+	kwargs["parent_port_no"] = omciInd.GetIntfId()
+
+	if onuDevice, err := dh.coreProxy.GetChildDevice(nil, dh.device.Id, kwargs); err != nil {
+		log.Errorw("onu not found", log.Fields{"intfId": omciInd.IntfId, "onuId": omciInd.OnuId})
+		return err
+	} else {
+		omciMsg := &ic.InterAdapterOmciMessage{Message: omciInd.Pkt}
+		if sendErr := dh.adapterProxy.SendInterAdapterMessage(context.Background(), omciMsg,
+			ic.InterAdapterMessageType_OMCI_REQUEST, dh.deviceType, onuDevice.Type,
+			onuDevice.Id, onuDevice.ProxyAddress.DeviceId, ""); sendErr != nil {
+			log.Errorw("send omci request error", log.Fields{"fromAdapter": dh.deviceType, "toAdapter": onuDevice.Type, "onuId": onuDevice.Id, "proxyDeviceId": onuDevice.ProxyAddress.DeviceId})
+			return sendErr
+		}
+		return nil
+	}
 }
 
 // Process_inter_adapter_message process inter adater message
 func (dh *DeviceHandler) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
-    // TODO
-    log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
-    return nil
+	// TODO
+	log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+	if msg.Header.Type == ic.InterAdapterMessageType_OMCI_REQUEST {
+		msgId := msg.Header.Id
+		fromTopic := msg.Header.FromTopic
+		toTopic := msg.Header.ToTopic
+		toDeviceId := msg.Header.ToDeviceId
+		proxyDeviceId := msg.Header.ProxyDeviceId
+
+		log.Debugw("omci request message header", log.Fields{"msgId": msgId, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceId": toDeviceId, "proxyDeviceId": proxyDeviceId})
+
+		msgBody := msg.GetBody()
+
+		omciMsg := &ic.InterAdapterOmciMessage{}
+		if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
+			log.Warnw("cannot-unmarshal-omci-msg-body", log.Fields{"error": err})
+			return err
+		}
+
+		if onuDevice, err := dh.coreProxy.GetDevice(nil, dh.device.Id, toDeviceId); err != nil {
+			log.Errorw("onu not found", log.Fields{"onuDeviceId": toDeviceId, "error": err})
+			return err
+		} else {
+			dh.sendProxiedMessage(onuDevice, omciMsg)
+		}
+
+	} else {
+		log.Errorw("inter-adapter-unhandled-type", log.Fields{"msgType": msg.Header.Type})
+	}
+	return nil
 }
 
+func (dh *DeviceHandler) sendProxiedMessage(onuDevice *voltha.Device, omciMsg *ic.InterAdapterOmciMessage) {
+	if onuDevice.ConnectStatus != voltha.ConnectStatus_REACHABLE {
+		log.Debugw("ONU is not reachable, cannot send OMCI", log.Fields{"serialNumber": onuDevice.SerialNumber, "intfId": onuDevice.ProxyAddress.GetChannelId(), "onuId": onuDevice.ProxyAddress.GetOnuId()})
+		return
+	}
+
+	omciMessage := &oop.OmciMsg{IntfId: onuDevice.ProxyAddress.GetChannelId(), OnuId: onuDevice.ProxyAddress.GetOnuId(), Pkt: omciMsg.Message}
+
+	dh.client.OmciMsgOut(context.Background(), omciMessage)
+	log.Debugw("omci-message-sent", log.Fields{"serialNumber": onuDevice.SerialNumber, "intfId": onuDevice.ProxyAddress.GetChannelId(), "omciMsg": string(omciMsg.Message)})
+}
+
+func (dh *DeviceHandler) activateONU(intfId uint32, onuId int64, serialNum *oop.SerialNumber, serialNumber string) {
+	log.Debugw("activate-onu", log.Fields{"intfId": intfId, "onuId": onuId, "serialNum": serialNum, "serialNumber": serialNumber})
+	// TODO: need resource manager
+	var pir uint32 = 1000000
+	Onu := oop.Onu{IntfId: intfId, OnuId: uint32(onuId), SerialNumber: serialNum, Pir: pir}
+	if _, err := dh.client.ActivateOnu(context.Background(), &Onu); err != nil {
+		log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu})
+	} else {
+		log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
+	}
+}
+
+func (dh *DeviceHandler) onuDiscIndication(onuDiscInd *oop.OnuDiscIndication, onuId uint32, sn string) error {
+	if err := dh.coreProxy.ChildDeviceDetected(nil, dh.device.Id, int(onuDiscInd.GetIntfId()), "brcm_openomci_onu", int(onuDiscInd.GetIntfId()), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuId)); err != nil {
+		log.Errorw("Create onu error", log.Fields{"parent_id": dh.device.Id, "ponPort": onuDiscInd.GetIntfId(), "onuId": onuId, "sn": sn, "error": err})
+		return err
+	}
+
+	kwargs := make(map[string]interface{})
+	kwargs["onu_id"] = onuId
+	kwargs["parent_port_no"] = onuDiscInd.GetIntfId()
+
+	for i := 0; i < 10; i++ {
+		if onuDevice, _ := dh.coreProxy.GetChildDevice(nil, dh.device.Id, kwargs); onuDevice != nil {
+			dh.activateONU(onuDiscInd.IntfId, int64(onuId), onuDiscInd.SerialNumber, sn)
+			return nil
+		} else {
+			time.Sleep(1 * time.Second)
+			log.Debugln("Sleep 1 seconds to active onu, retry times ", i+1)
+		}
+	}
+	log.Errorw("Cannot query onu, dont activate it.", log.Fields{"parent_id": dh.device.Id, "ponPort": onuDiscInd.GetIntfId(), "onuId": onuId, "sn": sn})
+	return errors.New("Failed to activate onu")
+}
+
+func (dh *DeviceHandler) onuIndication(onuInd *oop.OnuIndication) {
+	serialNumber := dh.stringifySerialNumber(onuInd.SerialNumber)
+
+	kwargs := make(map[string]interface{})
+	if serialNumber != "" {
+		kwargs["serial_number"] = serialNumber
+	} else {
+		kwargs["onu_id"] = onuInd.OnuId
+		kwargs["parent_port_no"] = onuInd.GetIntfId()
+	}
+	if onuDevice, _ := dh.coreProxy.GetChildDevice(nil, dh.device.Id, kwargs); onuDevice != nil {
+		//if intfIdFromPortNo(onuDevice.ParentPortNo) != onuInd.GetIntfId() {
+		if onuDevice.ParentPortNo != onuInd.GetIntfId() {
+			//log.Warnw("ONU-is-on-a-different-intf-id-now", log.Fields{"previousIntfId": intfIdFromPortNo(onuDevice.ParentPortNo), "currentIntfId": onuInd.GetIntfId()})
+			log.Warnw("ONU-is-on-a-different-intf-id-now", log.Fields{"previousIntfId": onuDevice.ParentPortNo, "currentIntfId": onuInd.GetIntfId()})
+		}
+
+		if onuDevice.ProxyAddress.OnuId != onuInd.OnuId {
+			log.Warnw("ONU-id-mismatch, can happen if both voltha and the olt rebooted", log.Fields{"expected_onu_id": onuDevice.ProxyAddress.OnuId, "received_onu_id": onuInd.OnuId})
+		}
+
+		// adminState
+		if onuInd.AdminState == "down" {
+			if onuInd.OperState != "down" {
+				log.Errorw("ONU-admin-state-down-and-oper-status-not-down", log.Fields{"operState": onuInd.OperState})
+				// Forcing the oper state change code to execute
+				onuInd.OperState = "down"
+			}
+			// Port and logical port update is taken care of by oper state block
+		} else if onuInd.AdminState == "up" {
+			log.Debugln("received-onu-admin-state up")
+		} else {
+			log.Errorw("Invalid-or-not-implemented-admin-state", log.Fields{"received-admin-state": onuInd.AdminState})
+		}
+		log.Debugln("admin-state-dealt-with")
+
+		// operState
+		if onuInd.OperState == "down" {
+			if onuDevice.ConnectStatus != common.ConnectStatus_UNREACHABLE {
+				dh.coreProxy.DeviceStateUpdate(nil, onuDevice.Id, common.ConnectStatus_UNREACHABLE, onuDevice.OperStatus)
+				log.Debugln("onu-oper-state-is-down")
+			}
+			if onuDevice.OperStatus != common.OperStatus_DISCOVERED {
+				dh.coreProxy.DeviceStateUpdate(nil, onuDevice.Id, common.ConnectStatus_UNREACHABLE, common.OperStatus_DISCOVERED)
+			}
+			log.Debugw("inter-adapter-send-onu-ind", log.Fields{"onuIndication": onuInd})
+
+			// TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
+			dh.adapterProxy.SendInterAdapterMessage(nil, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST, "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+		} else if onuInd.OperState == "up" {
+			if onuDevice.ConnectStatus != common.ConnectStatus_REACHABLE {
+				dh.coreProxy.DeviceStateUpdate(nil, onuDevice.Id, common.ConnectStatus_REACHABLE, onuDevice.OperStatus)
+
+			}
+			if onuDevice.OperStatus != common.OperStatus_DISCOVERED {
+				log.Warnw("ignore onu indication", log.Fields{"intfId": onuInd.IntfId, "onuId": onuInd.OnuId, "operStatus": onuDevice.OperStatus, "msgOperStatus": onuInd.OperState})
+				return
+			}
+			dh.adapterProxy.SendInterAdapterMessage(nil, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST, "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+		} else {
+			log.Warnw("Not-implemented-or-invalid-value-of-oper-state", log.Fields{"operState": onuInd.OperState})
+		}
+	} else {
+		log.Errorw("onu not found", log.Fields{"intfId": onuInd.IntfId, "onuId": onuInd.OnuId})
+		return
+	}
+
+}
+
+func (dh *DeviceHandler) stringifySerialNumber(serialNum *oop.SerialNumber) string {
+	if serialNum != nil {
+		return string(serialNum.VendorId) + dh.stringifyVendorSpecific(serialNum.VendorSpecific)
+	} else {
+		return ""
+	}
+}
+
+func (dh *DeviceHandler) stringifyVendorSpecific(vendorSpecific []byte) string {
+	tmp := fmt.Sprintf("%x", (uint32(vendorSpecific[0])>>4)&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[0]&0x0f))) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[1])>>4)&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[1]))&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[2])>>4)&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[2]))&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[3])>>4)&0x0f) +
+		fmt.Sprintf("%x", (uint32(vendorSpecific[3]))&0x0f)
+	return tmp
+}
+
+// flows
+func (dh *DeviceHandler) Update_flows_bulk() error {
+	return errors.New("UnImplemented")
+}
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
index 18ef08f..9a6afc3 100644
--- a/adaptercore/openolt.go
+++ b/adaptercore/openolt.go
@@ -16,144 +16,146 @@
 package adaptercore

 

 import (

-    "context"

-    "errors"

-    "fmt"

-    "sync"

+	"context"

+	"errors"

+	"fmt"

+	"sync"

 

-    com "github.com/opencord/voltha-go/adapters/common"

-    "github.com/opencord/voltha-go/common/log"

-    "github.com/opencord/voltha-go/kafka"

-    ic "github.com/opencord/voltha-go/protos/inter_container"

-    "github.com/opencord/voltha-go/protos/openflow_13"

-    "github.com/opencord/voltha-go/protos/voltha"

+	com "github.com/opencord/voltha-go/adapters/common"

+	"github.com/opencord/voltha-go/common/log"

+	"github.com/opencord/voltha-go/kafka"

+	ic "github.com/opencord/voltha-go/protos/inter_container"

+	"github.com/opencord/voltha-go/protos/openflow_13"

+	"github.com/opencord/voltha-go/protos/voltha"

 )

 

 type OpenOLT struct {

-    deviceHandlers        map[string]*DeviceHandler

-    coreProxy             *com.CoreProxy

-    kafkaICProxy          *kafka.InterContainerProxy

-    numOnus               int

-    exitChannel           chan int

-    lockDeviceHandlersMap sync.RWMutex

+	deviceHandlers        map[string]*DeviceHandler

+	coreProxy             *com.CoreProxy

+	adapterProxy          *com.AdapterProxy

+	kafkaICProxy          *kafka.InterContainerProxy

+	numOnus               int

+	exitChannel           chan int

+	lockDeviceHandlersMap sync.RWMutex

 }

 

-func NewOpenOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy, onuNumber int) *OpenOLT {

-    var openOLT OpenOLT

-    openOLT.exitChannel = make(chan int, 1)

-    openOLT.deviceHandlers = make(map[string]*DeviceHandler)

-    openOLT.kafkaICProxy = kafkaICProxy

-    openOLT.numOnus = onuNumber

-    openOLT.coreProxy = coreProxy

-    openOLT.lockDeviceHandlersMap = sync.RWMutex{}

-    return &openOLT

+func NewOpenOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy, adapterProxy *com.AdapterProxy, onuNumber int) *OpenOLT {

+	var openOLT OpenOLT

+	openOLT.exitChannel = make(chan int, 1)

+	openOLT.deviceHandlers = make(map[string]*DeviceHandler)

+	openOLT.kafkaICProxy = kafkaICProxy

+	openOLT.numOnus = onuNumber

+	openOLT.coreProxy = coreProxy

+	openOLT.adapterProxy = adapterProxy

+	openOLT.lockDeviceHandlersMap = sync.RWMutex{}

+	return &openOLT

 }

 

 func (oo *OpenOLT) Start(ctx context.Context) error {

-    log.Info("starting-device-manager")

-    log.Info("device-manager-started")

-    return nil

+	log.Info("starting-device-manager")

+	log.Info("device-manager-started")

+	return nil

 }

 

 func (oo *OpenOLT) Stop(ctx context.Context) error {

-    log.Info("stopping-device-manager")

-    oo.exitChannel <- 1

-    log.Info("device-manager-stopped")

-    return nil

+	log.Info("stopping-device-manager")

+	oo.exitChannel <- 1

+	log.Info("device-manager-stopped")

+	return nil

 }

 

 func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {

-    if ctx.Err() == nil {

-        // Returned response only of the ctx has not been cancelled/timeout/etc

-        // Channel is automatically closed when a context is Done

-        ch <- result

-        log.Debugw("sendResponse", log.Fields{"result": result})

-    } else {

-        // Should the transaction be reverted back?

-        log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})

-    }

+	if ctx.Err() == nil {

+		// Returned response only of the ctx has not been cancelled/timeout/etc

+		// Channel is automatically closed when a context is Done

+		ch <- result

+		log.Debugw("sendResponse", log.Fields{"result": result})

+	} else {

+		// Should the transaction be reverted back?

+		log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})

+	}

 }

 

 func (oo *OpenOLT) addDeviceHandlerToMap(agent *DeviceHandler) {

-    oo.lockDeviceHandlersMap.Lock()

-    defer oo.lockDeviceHandlersMap.Unlock()

-    if _, exist := oo.deviceHandlers[agent.deviceId]; !exist {

-        oo.deviceHandlers[agent.deviceId] = agent

-    }

+	oo.lockDeviceHandlersMap.Lock()

+	defer oo.lockDeviceHandlersMap.Unlock()

+	if _, exist := oo.deviceHandlers[agent.deviceId]; !exist {

+		oo.deviceHandlers[agent.deviceId] = agent

+	}

 }

 

 func (oo *OpenOLT) deleteDeviceHandlerToMap(agent *DeviceHandler) {

-    oo.lockDeviceHandlersMap.Lock()

-    defer oo.lockDeviceHandlersMap.Unlock()

-    delete(oo.deviceHandlers, agent.deviceId)

+	oo.lockDeviceHandlersMap.Lock()

+	defer oo.lockDeviceHandlersMap.Unlock()

+	delete(oo.deviceHandlers, agent.deviceId)

 }

 

 func (oo *OpenOLT) getDeviceHandler(deviceId string) *DeviceHandler {

-    oo.lockDeviceHandlersMap.Lock()

-    defer oo.lockDeviceHandlersMap.Unlock()

-    if agent, ok := oo.deviceHandlers[deviceId]; ok {

-        return agent

-    }

-    return nil

+	oo.lockDeviceHandlersMap.Lock()

+	defer oo.lockDeviceHandlersMap.Unlock()

+	if agent, ok := oo.deviceHandlers[deviceId]; ok {

+		return agent

+	}

+	return nil

 }

 

 func (oo *OpenOLT) createDeviceTopic(device *voltha.Device) error {

-    log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})

-    deviceTopic := kafka.Topic{Name: oo.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}

-    // TODO for the offset

-    if err := oo.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, 0); err != nil {

-        log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})

-        return err

-    }

-    return nil

+	log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})

+	deviceTopic := kafka.Topic{Name: oo.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}

+	// TODO for the offset

+	if err := oo.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, 0); err != nil {

+		log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})

+		return err

+	}

+	return nil

 }

 

 func (oo *OpenOLT) Adopt_device(device *voltha.Device) error {

-    if device == nil {

-        log.Warn("device-is-nil")

-        return errors.New("nil-device")

-    }

-    log.Infow("adopt-device", log.Fields{"deviceId": device.Id})

-    var handler *DeviceHandler

-    if handler = oo.getDeviceHandler(device.Id); handler == nil {

-        handler := NewDeviceHandler(oo.coreProxy, device, oo)

-        oo.addDeviceHandlerToMap(handler)

-        go handler.AdoptDevice(device)

-        // Launch the creation of the device topic

-        go oo.createDeviceTopic(device)

-    }

-    return nil

+	if device == nil {

+		log.Warn("device-is-nil")

+		return errors.New("nil-device")

+	}

+	log.Infow("adopt-device", log.Fields{"deviceId": device.Id})

+	var handler *DeviceHandler

+	if handler = oo.getDeviceHandler(device.Id); handler == nil {

+		handler := NewDeviceHandler(oo.coreProxy, oo.adapterProxy, device, oo)

+		oo.addDeviceHandlerToMap(handler)

+		go handler.AdoptDevice(device)

+		// Launch the creation of the device topic

+		// go oo.createDeviceTopic(device)

+	}

+	return nil

 }

 

 func (oo *OpenOLT) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {

-    log.Infow("Get_ofp_device_info", log.Fields{"deviceId": device.Id})

-    if handler := oo.getDeviceHandler(device.Id); handler != nil {

-        return handler.GetOfpDeviceInfo(device)

-    }

-    log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})

-    return nil, errors.New("device-handler-not-set")

+	log.Infow("Get_ofp_device_info", log.Fields{"deviceId": device.Id})

+	if handler := oo.getDeviceHandler(device.Id); handler != nil {

+		return handler.GetOfpDeviceInfo(device)

+	}

+	log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})

+	return nil, errors.New("device-handler-not-set")

 }

 

 func (oo *OpenOLT) Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error) {

-    log.Infow("Get_ofp_port_info", log.Fields{"deviceId": device.Id})

-    if handler := oo.getDeviceHandler(device.Id); handler != nil {

-        return handler.GetOfpPortInfo(device, port_no)

-    }

-    log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})

-    return nil, errors.New("device-handler-not-set")

+	log.Infow("Get_ofp_port_info", log.Fields{"deviceId": device.Id})

+	if handler := oo.getDeviceHandler(device.Id); handler != nil {

+		return handler.GetOfpPortInfo(device, port_no)

+	}

+	log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})

+	return nil, errors.New("device-handler-not-set")

 }

 

 func (oo *OpenOLT) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {

-    log.Infow("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})

-    targetDevice := msg.Header.ProxyDeviceId // Request?

-    if targetDevice == "" && msg.Header.ToDeviceId != "" {

-        // Typical response

-        targetDevice = msg.Header.ToDeviceId

-    }

-    if handler := oo.getDeviceHandler(targetDevice); handler != nil {

-        return handler.Process_inter_adapter_message(msg)

-    }

-    return errors.New(fmt.Sprintf("handler-not-found-%s", targetDevice))

+	log.Infow("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})

+	targetDevice := msg.Header.ProxyDeviceId // Request?

+	if targetDevice == "" && msg.Header.ToDeviceId != "" {

+		// Typical response

+		targetDevice = msg.Header.ToDeviceId

+	}

+	if handler := oo.getDeviceHandler(targetDevice); handler != nil {

+		return handler.Process_inter_adapter_message(msg)

+	}

+	return errors.New(fmt.Sprintf("handler-not-found-%s", targetDevice))

 }

 

 func (oo *OpenOLT) Adapter_descriptor() error {

diff --git a/main.go b/main.go
index 12c5ff6..ca62a01 100644
--- a/main.go
+++ b/main.go
@@ -1,340 +1,344 @@
-/*

- * Copyright 2018-present Open Networking Foundation

-

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * You may obtain a copy of the License at

-

- * http://www.apache.org/licenses/LICENSE-2.0

-

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-package main

-

-import (

-    "context"

-    "errors"

-    "fmt"

-    "github.com/opencord/voltha-go/adapters"

-    com "github.com/opencord/voltha-go/adapters/common"

-    ac "github.com/opencord/voltha-go/adapters/openolt/adaptercore"

-    "github.com/opencord/voltha-go/adapters/openolt/config"

-    "github.com/opencord/voltha-go/common/log"

-    "github.com/opencord/voltha-go/db/kvstore"

-    "github.com/opencord/voltha-go/kafka"

-    ic "github.com/opencord/voltha-go/protos/inter_container"

-    "github.com/opencord/voltha-go/protos/voltha"

-    "os"

-    "os/signal"

-    "strconv"

-    "syscall"

-    "time"

-)

-

-type adapter struct {

-    instanceId       string

-    config           *config.AdapterFlags

-    iAdapter         adapters.IAdapter

-    kafkaClient      kafka.Client

-    kvClient         kvstore.Client

-    kip              *kafka.InterContainerProxy

-    coreProxy        *com.CoreProxy

-    halted           bool

-    exitChannel      chan int

-    receiverChannels []<-chan *ic.InterContainerMessage

-}

-

-func init() {

-    log.AddPackage(log.JSON, log.DebugLevel, nil)

-}

-

-func newAdapter(cf *config.AdapterFlags) *adapter {

-    var a adapter

-    a.instanceId = cf.InstanceID

-    a.config = cf

-    a.halted = false

-    a.exitChannel = make(chan int, 1)

-    a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)

-    return &a

-}

-

-func (a *adapter) start(ctx context.Context) {

-    log.Info("Starting Core Adapter components")

-    var err error

-

-    // Setup KV Client

-    log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})

-    if err := a.setKVClient(); err != nil {

-        log.Fatal("error-setting-kv-client")

-    }

-

-    // Setup Kafka Client

-    if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {

-        log.Fatal("Unsupported-common-client")

-    }

-

-    // Start the common InterContainer Proxy - retries indefinitely

-    if a.kip, err = a.startInterContainerProxy(-1); err != nil {

-        log.Fatal("error-starting-inter-container-proxy")

-    }

-

-    // Create the core proxy to handle requests to the Core

-    a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)

-

-    // Create the open OLT adapter

-    if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.config.OnuNumber); err != nil {

-        log.Fatal("error-starting-inter-container-proxy")

-    }

-

-    // Register the core request handler

-    if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {

-        log.Fatal("error-setting-core-request-handler")

-    }

-

-    //    Register this adapter to the Core - retries indefinitely

-    if err = a.registerWithCore(-1); err != nil {

-        log.Fatal("error-registering-with-core")

-    }

-}

-

-func (rw *adapter) stop() {

-    // Stop leadership tracking

-    rw.halted = true

-

-    // send exit signal

-    rw.exitChannel <- 0

-

-    // Cleanup - applies only if we had a kvClient

-    if rw.kvClient != nil {

-        // Release all reservations

-        if err := rw.kvClient.ReleaseAllReservations(); err != nil {

-            log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})

-        }

-        // Close the DB connection

-        rw.kvClient.Close()

-    }

-

-    // TODO:  More cleanup

-}

-

-func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {

-

-    log.Infow("kv-store-type", log.Fields{"store": storeType})

-    switch storeType {

-    case "consul":

-        return kvstore.NewConsulClient(address, timeout)

-    case "etcd":

-        return kvstore.NewEtcdClient(address, timeout)

-    }

-    return nil, errors.New("unsupported-kv-store")

-}

-

-func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {

-

-    log.Infow("common-client-type", log.Fields{"client": clientType})

-    switch clientType {

-    case "sarama":

-        return kafka.NewSaramaClient(

-            kafka.Host(host),

-            kafka.Port(port),

-            kafka.ProducerReturnOnErrors(true),

-            kafka.ProducerReturnOnSuccess(true),

-            kafka.ProducerMaxRetries(6),

-            kafka.ProducerRetryBackoff(time.Millisecond*30)), nil

-    }

-    return nil, errors.New("unsupported-client-type")

-}

-

-func (a *adapter) setKVClient() error {

-    addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)

-    client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)

-    if err != nil {

-        a.kvClient = nil

-        log.Error(err)

-        return err

-    }

-    a.kvClient = client

-    return nil

-}

-

-func toString(value interface{}) (string, error) {

-    switch t := value.(type) {

-    case []byte:

-        return string(value.([]byte)), nil

-    case string:

-        return value.(string), nil

-    default:

-        return "", fmt.Errorf("unexpected-type-%T", t)

-    }

-}

-

-func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {

-    log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,

-        "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})

-    var err error

-    var kip *kafka.InterContainerProxy

-    if kip, err = kafka.NewInterContainerProxy(

-        kafka.InterContainerHost(a.config.KafkaAdapterHost),

-        kafka.InterContainerPort(a.config.KafkaAdapterPort),

-        kafka.MsgClient(a.kafkaClient),

-        kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {

-        log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})

-        return nil, err

-    }

-    count := 0

-    for {

-        if err = kip.Start(); err != nil {

-            log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})

-            if retries == count {

-                return nil, err

-            }

-            count = +1

-            //    Take a nap before retrying

-            time.Sleep(2 * time.Second)

-        } else {

-            break

-        }

-    }

-

-    log.Info("common-messaging-proxy-created")

-    return kip, nil

-}

-

-func (a *adapter) startOpenOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, onuNumber int) (*ac.OpenOLT, error) {

-    log.Info("starting-open-olt")

-    var err error

-    sOLT := ac.NewOpenOLT(ctx, a.kip, cp, onuNumber)

-

-    if err = sOLT.Start(ctx); err != nil {

-        log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})

-        return nil, err

-    }

-

-    log.Info("open-olt-started")

-    return sOLT, nil

-}

-

-func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {

-    log.Info("setting-request-handler")

-    requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, a.coreProxy)

-    if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {

-        log.Errorw("request-handler-setup-failed", log.Fields{"error": err})

-        return err

-

-    }

-    log.Info("request-handler-setup-done")

-    return nil

-}

-

-func (a *adapter) registerWithCore(retries int) error {

-    log.Info("registering-with-core")

-    adapterDescription := &voltha.Adapter{Id: "openolt", Vendor: "simulation Enterprise Inc"}

-    types := []*voltha.DeviceType{{Id: "openolt", Adapter: "openolt"}}

-    deviceTypes := &voltha.DeviceTypes{Items: types}

-    count := 0

-    for {

-        if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {

-            log.Warnw("registering-with-core-failed", log.Fields{"error": err})

-            if retries == count {

-                return err

-            }

-            count += 1

-            //    Take a nap before retrying

-            time.Sleep(2 * time.Second)

-        } else {

-            break

-        }

-    }

-    log.Info("registered-with-core")

-    return nil

-}

-

-func waitForExit() int {

-    signalChannel := make(chan os.Signal, 1)

-    signal.Notify(signalChannel,

-        syscall.SIGHUP,

-        syscall.SIGINT,

-        syscall.SIGTERM,

-        syscall.SIGQUIT)

-

-    exitChannel := make(chan int)

-

-    go func() {

-        s := <-signalChannel

-        switch s {

-        case syscall.SIGHUP,

-            syscall.SIGINT,

-            syscall.SIGTERM,

-            syscall.SIGQUIT:

-            log.Infow("closing-signal-received", log.Fields{"signal": s})

-            exitChannel <- 0

-        default:

-            log.Infow("unexpected-signal-received", log.Fields{"signal": s})

-            exitChannel <- 1

-        }

-    }()

-

-    code := <-exitChannel

-    return code

-}

-

-func printBanner() {

-    fmt.Println("   ____                     ____  _   _______ ")

-    fmt.Println("  / _ \\                   / __\\| | |__   __|")

-    fmt.Println(" | |  | |_ __   ___ _ __  | |  | | |    | |   ")

-    fmt.Println(" | |  | | '_\\ / _\\ '_\\ | |  | | |    | |   ")

-    fmt.Println(" | |__| | |_) |  __/ | | || |__| | |____| |   ")

-    fmt.Println(" \\____/| .__/\\___|_| |_|\\____/|______|_|   ")

-    fmt.Println("        | |                                   ")

-    fmt.Println("        |_|                                   ")

-    fmt.Println("                                              ")

-}

-

-func main() {

-    start := time.Now()

-

-    cf := config.NewAdapterFlags()

-    cf.ParseCommandArguments()

-

-    //// Setup logging

-

-    //Setup default logger - applies for packages that do not have specific logger set

-    if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {

-        log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")

-    }

-

-    // Update all loggers (provisionned via init) with a common field

-    if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {

-        log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")

-    }

-

-    log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel)

-

-    defer log.CleanUp()

-

-    // Print banner if specified

-    if cf.Banner {

-        printBanner()

-    }

-

-    log.Infow("config", log.Fields{"config": *cf})

-

-    ctx, cancel := context.WithCancel(context.Background())

-    defer cancel()

-

-    ad := newAdapter(cf)

-    go ad.start(ctx)

-

-    code := waitForExit()

-    log.Infow("received-a-closing-signal", log.Fields{"code": code})

-

-    // Cleanup before leaving

-    ad.stop()

-

-    elapsed := time.Since(start)

-    log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})

-}

+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"github.com/opencord/voltha-go/adapters"
+	com "github.com/opencord/voltha-go/adapters/common"
+	ac "github.com/opencord/voltha-go/adapters/openolt/adaptercore"
+	"github.com/opencord/voltha-go/adapters/openolt/config"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/db/kvstore"
+	"github.com/opencord/voltha-go/kafka"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"os"
+	"os/signal"
+	"strconv"
+	"syscall"
+	"time"
+)
+
+type adapter struct {
+	instanceId       string
+	config           *config.AdapterFlags
+	iAdapter         adapters.IAdapter
+	kafkaClient      kafka.Client
+	kvClient         kvstore.Client
+	kip              *kafka.InterContainerProxy
+	coreProxy        *com.CoreProxy
+	adapterProxy     *com.AdapterProxy
+	halted           bool
+	exitChannel      chan int
+	receiverChannels []<-chan *ic.InterContainerMessage
+}
+
+func init() {
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+func newAdapter(cf *config.AdapterFlags) *adapter {
+	var a adapter
+	a.instanceId = cf.InstanceID
+	a.config = cf
+	a.halted = false
+	a.exitChannel = make(chan int, 1)
+	a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
+	return &a
+}
+
+func (a *adapter) start(ctx context.Context) {
+	log.Info("Starting Core Adapter components")
+	var err error
+
+	// Setup KV Client
+	log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
+	if err := a.setKVClient(); err != nil {
+		log.Fatal("error-setting-kv-client")
+	}
+
+	// Setup Kafka Client
+	if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
+		log.Fatal("Unsupported-common-client")
+	}
+
+	// Start the common InterContainer Proxy - retries indefinitely
+	if a.kip, err = a.startInterContainerProxy(-1); err != nil {
+		log.Fatal("error-starting-inter-container-proxy")
+	}
+
+	// Create the core proxy to handle requests to the Core
+	a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
+
+	// Create the adaptor proxy to handle request between olt and onu
+	a.adapterProxy = com.NewAdapterProxy(a.kip, "brcm_openomci_onu", a.config.CoreTopic)
+
+	// Create the open OLT adapter
+	if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.adapterProxy, a.config.OnuNumber); err != nil {
+		log.Fatal("error-starting-inter-container-proxy")
+	}
+
+	// Register the core request handler
+	if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+		log.Fatal("error-setting-core-request-handler")
+	}
+
+	//    Register this adapter to the Core - retries indefinitely
+	if err = a.registerWithCore(-1); err != nil {
+		log.Fatal("error-registering-with-core")
+	}
+}
+
+func (rw *adapter) stop() {
+	// Stop leadership tracking
+	rw.halted = true
+
+	// send exit signal
+	rw.exitChannel <- 0
+
+	// Cleanup - applies only if we had a kvClient
+	if rw.kvClient != nil {
+		// Release all reservations
+		if err := rw.kvClient.ReleaseAllReservations(); err != nil {
+			log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
+		}
+		// Close the DB connection
+		rw.kvClient.Close()
+	}
+
+	// TODO:  More cleanup
+}
+
+func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
+
+	log.Infow("kv-store-type", log.Fields{"store": storeType})
+	switch storeType {
+	case "consul":
+		return kvstore.NewConsulClient(address, timeout)
+	case "etcd":
+		return kvstore.NewEtcdClient(address, timeout)
+	}
+	return nil, errors.New("unsupported-kv-store")
+}
+
+func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+
+	log.Infow("common-client-type", log.Fields{"client": clientType})
+	switch clientType {
+	case "sarama":
+		return kafka.NewSaramaClient(
+			kafka.Host(host),
+			kafka.Port(port),
+			kafka.ProducerReturnOnErrors(true),
+			kafka.ProducerReturnOnSuccess(true),
+			kafka.ProducerMaxRetries(6),
+			kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+	}
+	return nil, errors.New("unsupported-client-type")
+}
+
+func (a *adapter) setKVClient() error {
+	addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
+	client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
+	if err != nil {
+		a.kvClient = nil
+		log.Error(err)
+		return err
+	}
+	a.kvClient = client
+	return nil
+}
+
+func toString(value interface{}) (string, error) {
+	switch t := value.(type) {
+	case []byte:
+		return string(value.([]byte)), nil
+	case string:
+		return value.(string), nil
+	default:
+		return "", fmt.Errorf("unexpected-type-%T", t)
+	}
+}
+
+func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
+	log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
+		"port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
+	var err error
+	var kip *kafka.InterContainerProxy
+	if kip, err = kafka.NewInterContainerProxy(
+		kafka.InterContainerHost(a.config.KafkaAdapterHost),
+		kafka.InterContainerPort(a.config.KafkaAdapterPort),
+		kafka.MsgClient(a.kafkaClient),
+		kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
+		log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
+		return nil, err
+	}
+	count := 0
+	for {
+		if err = kip.Start(); err != nil {
+			log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
+			if retries == count {
+				return nil, err
+			}
+			count = +1
+			//    Take a nap before retrying
+			time.Sleep(2 * time.Second)
+		} else {
+			break
+		}
+	}
+
+	log.Info("common-messaging-proxy-created")
+	return kip, nil
+}
+
+func (a *adapter) startOpenOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, ap *com.AdapterProxy, onuNumber int) (*ac.OpenOLT, error) {
+	log.Info("starting-open-olt")
+	var err error
+	sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, onuNumber)
+
+	if err = sOLT.Start(ctx); err != nil {
+		log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
+		return nil, err
+	}
+
+	log.Info("open-olt-started")
+	return sOLT, nil
+}
+
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+	log.Info("setting-request-handler")
+	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter, a.coreProxy)
+	if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
+		log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
+		return err
+
+	}
+	log.Info("request-handler-setup-done")
+	return nil
+}
+
+func (a *adapter) registerWithCore(retries int) error {
+	log.Info("registering-with-core")
+	adapterDescription := &voltha.Adapter{Id: "openolt", Vendor: "simulation Enterprise Inc"}
+	types := []*voltha.DeviceType{{Id: "openolt", Adapter: "openolt"}}
+	deviceTypes := &voltha.DeviceTypes{Items: types}
+	count := 0
+	for {
+		if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {
+			log.Warnw("registering-with-core-failed", log.Fields{"error": err})
+			if retries == count {
+				return err
+			}
+			count += 1
+			//    Take a nap before retrying
+			time.Sleep(2 * time.Second)
+		} else {
+			break
+		}
+	}
+	log.Info("registered-with-core")
+	return nil
+}
+
+func waitForExit() int {
+	signalChannel := make(chan os.Signal, 1)
+	signal.Notify(signalChannel,
+		syscall.SIGHUP,
+		syscall.SIGINT,
+		syscall.SIGTERM,
+		syscall.SIGQUIT)
+
+	exitChannel := make(chan int)
+
+	go func() {
+		s := <-signalChannel
+		switch s {
+		case syscall.SIGHUP,
+			syscall.SIGINT,
+			syscall.SIGTERM,
+			syscall.SIGQUIT:
+			log.Infow("closing-signal-received", log.Fields{"signal": s})
+			exitChannel <- 0
+		default:
+			log.Infow("unexpected-signal-received", log.Fields{"signal": s})
+			exitChannel <- 1
+		}
+	}()
+
+	code := <-exitChannel
+	return code
+}
+
+func printBanner() {
+	fmt.Println("   ____                     ____  _   _______ ")
+	fmt.Println("  / _ \\                   / __\\| | |__   __|")
+	fmt.Println(" | |  | |_ __   ___ _ __  | |  | | |    | |   ")
+	fmt.Println(" | |  | | '_\\ / _\\ '_\\ | |  | | |    | |   ")
+	fmt.Println(" | |__| | |_) |  __/ | | || |__| | |____| |   ")
+	fmt.Println(" \\____/| .__/\\___|_| |_|\\____/|______|_|   ")
+	fmt.Println("        | |                                   ")
+	fmt.Println("        |_|                                   ")
+	fmt.Println("                                              ")
+}
+
+func main() {
+	start := time.Now()
+
+	cf := config.NewAdapterFlags()
+	cf.ParseCommandArguments()
+
+	//// Setup logging
+
+	//Setup default logger - applies for packages that do not have specific logger set
+	if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+
+	// Update all loggers (provisionned via init) with a common field
+	if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel)
+
+	defer log.CleanUp()
+
+	// Print banner if specified
+	if cf.Banner {
+		printBanner()
+	}
+
+	log.Infow("config", log.Fields{"config": *cf})
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	ad := newAdapter(cf)
+	go ad.start(ctx)
+
+	code := waitForExit()
+	log.Infow("received-a-closing-signal", log.Fields{"code": code})
+
+	// Cleanup before leaving
+	ad.stop()
+
+	elapsed := time.Since(start)
+	log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
+}