[VOL-1359] This commit consists of the creation of the simulated
OLT and ONU adapters (in Go language).  This update also provides
the set of files to build and run these containers.

 protos/scripts/build_protos.sh protos
-### Building and running Voltha
+### Building Voltha Core
 A fatal error occurs if Voltha is built and executed at this stage:
 > go run rw_core/main.go
 make rw_core
+### Building and running Ponsim OLT and ONU Adapters
+Please refer to the README.md file under the ```python``` directory
+### Building Simulated OLT and ONU Adapters
+Simulated OLT, ONU and rw_core can be build together:
+make build
+or they via be individually built:
+make rw_core
+make simulated_olt
+make simulated_onu
+### Running rw_core, Simulated OLT and ONU Adapters
+In the example below we are using the docker-compose command to run these containers locally.
+DOCKER_HOST_IP=<Host IP> docker-compose -f compose/docker-compose-zk-kafka-test.yml up -d
+DOCKER_HOST_IP=<Host IP> docker-compose -f compose/docker-compose-etcd.yml up -d
+DOCKER_HOST_IP=<Host IP> docker-compose -f compose/rw_core.yml up -d
+DOCKER_HOST_IP=<Host IP> docker-compose -f compose/compose/adapters-simulated.yml up -d
+You should see the following containers up and running
+CONTAINER ID        IMAGE                           COMMAND                  CREATED              STATUS              PORTS                                                                      NAMES
+338fd67c2029        voltha-adapter-simulated-onu    "/app/simulated_onu …"   37 seconds ago       Up 36 seconds                                                                                  compose_adapter_simulated_onu_1_a39b1a9d27d5
+15b159bab626        voltha-adapter-simulated-olt    "/app/simulated_olt …"   37 seconds ago       Up 36 seconds                                                                                  compose_adapter_simulated_olt_1_b5407c23b483
+401128a1755f        voltha-rw-core                  "/app/rw_core -kv_st…"   About a minute ago   Up About a minute>50057/tcp                                                   compose_rw_core_1_36cd5e255edf
+ba4eb9384f5b        quay.io/coreos/etcd:v3.2.9      "etcd --name=etcd0 -…"   About a minute ago   Up About a minute>2379/tcp,>2380/tcp,>4001/tcp   compose_etcd_1_368cd0bc1421
+55f74277a530        wurstmeister/kafka:2.11-2.0.1   "start-kafka.sh"         2 minutes ago        Up 2 minutes>9092/tcp                                                     compose_kafka_1_a8631e438fe2
+fb60076d8b3e        wurstmeister/zookeeper:latest   "/bin/sh -c '/usr/sb…"   2 minutes ago        Up 2 minutes        22/tcp, 2888/tcp, 3888/tcp,>2181/tcp                         compose_zookeeper_1_7ff68af103cf
-.PHONY: $(DIRS) $(DIRS_CLEAN) $(DIRS_FLAKE8) rw_core ro_core protos kafka db tests adapters
+.PHONY: $(DIRS) $(DIRS_CLEAN) $(DIRS_FLAKE8) rw_core protos kafka db tests python simulators k8s
 # This should to be the first and default target in this Makefile
 build: containers
-containers: rw_core
+containers: rw_core simulated_olt simulated_onu
 ifneq ($(VOLTHA_BUILD),docker)
 	docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-rw-core:${TAG} -f docker/Dockerfile.rw_core_d .
+ifneq ($(VOLTHA_BUILD),docker)
+	docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-adapter-simulated-olt:${TAG} -f docker/Dockerfile.simulated_olt .
+	docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-adapter-simulated-olt:${TAG} -f docker/Dockerfile.simulated_olt_d .
+ifneq ($(VOLTHA_BUILD),docker)
+	docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-adapter-simulated-onu:${TAG} -f docker/Dockerfile.simulated_onu .
+	docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-adapter-simulated-onu:${TAG} -f docker/Dockerfile.simulated_onu_d .
 # end file
+## How to Build and Run a Voltha Go language Adapter
+This directory is a repo for all voltha adapters written in Go language.  At this time, the simulated_olt and 
+simulated_onu adapters are the only adapters using the Go language.  These adapters provide basic capabilities
+which will be used for high availability and capacity testing.
+### Building and running the Simulated OLT and ONU Adapters
+Please refer to the ```BUILD.md``` file under the voltha-go repo
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+import (
+	"context"
+	"github.com/golang/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/google/uuid"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/kafka"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	"time"
+type AdapterProxy struct {
+	kafkaICProxy *kafka.InterContainerProxy
+	adapterTopic string
+	coreTopic    string
+func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
+	var proxy AdapterProxy
+	proxy.kafkaICProxy = kafkaProxy
+	proxy.adapterTopic = adapterTopic
+	proxy.coreTopic = coreTopic
+	log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+	return &proxy
+func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
+	msg proto.Message,
+	msgType ic.InterAdapterMessageType_Types,
+	fromAdapter string,
+	toAdapter string,
+	toDeviceId string,
+	proxyDeviceId string,
+	messageId string) error {
+	log.Debugw("sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
+		"to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
+	//Marshal the message
+	var marshalledMsg *any.Any
+	var err error
+	if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
+		log.Warnw("cannot-marshal-msg", log.Fields{"error": err})
+		return err
+	}
+	//Build the inter adapter message
+	header := &ic.InterAdapterHeader{
+		Type:          msgType,
+		FromTopic:     fromAdapter,
+		ToTopic:       toAdapter,
+		ToDeviceId:    toDeviceId,
+		ProxyDeviceId: proxyDeviceId,
+	}
+	if messageId != "" {
+		header.Id = messageId
+	} else {
+		header.Id = uuid.New().String()
+	}
+	header.Timestamp = time.Now().Unix()
+	iaMsg := &ic.InterAdapterMessage{
+		Header: header,
+		Body:   marshalledMsg,
+	}
+	args := make([]*kafka.KVArg, 1)
+	args[0] = &kafka.KVArg{
+		Key:   "msg",
+		Value: iaMsg,
+	}
+	// Set up the required rpc arguments
+	topic := kafka.Topic{Name: fromAdapter}
+	replyToTopic := kafka.Topic{Name: toAdapter}
+	rpc := "Process_inter_adapter_message"
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+	log.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+	return unPackResponse(rpc, "", success, result)
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+import (
+	"context"
+	"github.com/golang/protobuf/ptypes"
+	a "github.com/golang/protobuf/ptypes/any"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/kafka"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+type CoreProxy struct {
+	kafkaICProxy *kafka.InterContainerProxy
+	adapterTopic string
+	coreTopic    string
+func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
+	var proxy CoreProxy
+	proxy.kafkaICProxy = kafkaProxy
+	proxy.adapterTopic = adapterTopic
+	proxy.coreTopic = coreTopic
+	log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+	return &proxy
+func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
+	if success {
+		return nil
+	} else {
+		unpackResult := &ic.Error{}
+		var err error
+		if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+		}
+		log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
+		// TODO:  Need to get the real error code
+		return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+	}
+func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
+	log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
+	rpc := "Register"
+	topic := kafka.Topic{Name: ap.coreTopic}
+	replyToTopic := kafka.Topic{Name: ap.adapterTopic}
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "adapter",
+		Value: adapter,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "deviceTypes",
+		Value: deviceTypes,
+	}
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+	log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+	return unPackResponse(rpc, "", success, result)
+func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
+	log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
+	rpc := "DeviceUpdate"
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	toTopic := kafka.CreateSubTopic(ap.coreTopic, device.Id)
+	args := make([]*kafka.KVArg, 1)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	// Use a device specific topic as we are the only adaptercore handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
+func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
+	log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
+	rpc := "PortCreated"
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+	args := make([]*kafka.KVArg, 2)
+	id := &voltha.ID{Id: deviceId}
+	args[0] = &kafka.KVArg{
+		Key:   "device_id",
+		Value: id,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "port",
+		Value: port,
+	}
+	// Use a device specific topic as we are the only adaptercore handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
+	return unPackResponse(rpc, deviceId, success, result)
+func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
+	connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
+	log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
+	rpc := "DeviceStateUpdate"
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	toTopic := kafka.CreateSubTopic(ap.coreTopic, deviceId)
+	args := make([]*kafka.KVArg, 3)
+	id := &voltha.ID{Id: deviceId}
+	oStatus := &ic.IntType{Val: int64(operStatus)}
+	cStatus := &ic.IntType{Val: int64(connStatus)}
+	args[0] = &kafka.KVArg{
+		Key:   "device_id",
+		Value: id,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "oper_status",
+		Value: oStatus,
+	}
+	args[2] = &kafka.KVArg{
+		Key:   "connect_status",
+		Value: cStatus,
+	}
+	// Use a device specific topic as we are the only adaptercore handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, deviceId)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+	return unPackResponse(rpc, deviceId, success, result)
+func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
+	childDeviceType string, channelId int) error {
+	log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
+	rpc := "ChildDeviceDetected"
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	toTopic := kafka.CreateSubTopic(ap.coreTopic, parentDeviceId)
+	replyToTopic := kafka.CreateSubTopic(ap.adapterTopic, parentDeviceId)
+	args := make([]*kafka.KVArg, 4)
+	id := &voltha.ID{Id: parentDeviceId}
+	args[0] = &kafka.KVArg{
+		Key:   "parent_device_id",
+		Value: id,
+	}
+	ppn := &ic.IntType{Val: int64(parentPortNo)}
+	args[1] = &kafka.KVArg{
+		Key:   "parent_port_no",
+		Value: ppn,
+	}
+	cdt := &ic.StrType{Val: childDeviceType}
+	args[2] = &kafka.KVArg{
+		Key:   "child_device_type",
+		Value: cdt,
+	}
+	channel := &ic.IntType{Val: int64(channelId)}
+	args[3] = &kafka.KVArg{
+		Key:   "channel_id",
+		Value: channel,
+	}
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+	log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+	return unPackResponse(rpc, parentDeviceId, success, result)
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+import (
+	"errors"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-go/adapters"
+	"github.com/opencord/voltha-go/common/log"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+type RequestHandlerProxy struct {
+	TestMode       bool
+	coreInstanceId string
+	adapter        adapters.IAdapter
+func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter) *RequestHandlerProxy {
+	var proxy RequestHandlerProxy
+	proxy.coreInstanceId = coreInstanceId
+	proxy.adapter = iadapter
+	return &proxy
+func (rhp *RequestHandlerProxy) Adapter_descriptor() (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Device_types() (*voltha.DeviceTypes, error) {
+	return nil, nil
+func (rhp *RequestHandlerProxy) Health() (*voltha.HealthStatus, error) {
+	return nil, nil
+func (rhp *RequestHandlerProxy) Adopt_device(args []*ic.Argument) (*empty.Empty, error) {
+	if len(args) != 1 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+	device := &voltha.Device{}
+	if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
+		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+		return nil, err
+	}
+	log.Debugw("Adopt_device", log.Fields{"deviceId": device.Id})
+	//Invoke the adopt device on the adapter
+	if err := rhp.adapter.Adopt_device(device); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Reconcile_device(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Abandon_device(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Disable_device(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Reenable_device(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Reboot_device(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Self_test_device(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Delete_device(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Get_device_details(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Update_flows_bulk(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Update_flows_incrementally(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Update_pm_config(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Receive_packet_out(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Suppress_alarm(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Unsuppress_alarm(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Get_ofp_device_info(args []*ic.Argument) (*ic.SwitchCapability, error) {
+	if len(args) != 1 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+	device := &voltha.Device{}
+	if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
+		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+		return nil, err
+	}
+	log.Debugw("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
+	var cap *ic.SwitchCapability
+	var err error
+	if cap, err = rhp.adapter.Get_ofp_device_info(device); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
+	return cap, nil
+func (rhp *RequestHandlerProxy) Get_ofp_port_info(args []*ic.Argument) (*ic.PortCapability, error) {
+	if len(args) != 2 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+	device := &voltha.Device{}
+	pNo := &ic.IntType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+				return nil, err
+			}
+		case "port_no":
+			if err := ptypes.UnmarshalAny(arg.Value, pNo); err != nil {
+				log.Warnw("cannot-unmarshal-port-no", log.Fields{"error": err})
+				return nil, err
+			}
+		}
+	}
+	log.Debugw("Get_ofp_port_info", log.Fields{"deviceId": device.Id, "portNo": pNo.Val})
+	var cap *ic.PortCapability
+	var err error
+	if cap, err = rhp.adapter.Get_ofp_port_info(device, pNo.Val); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
+	return cap, nil
+func (rhp *RequestHandlerProxy) Process_inter_adapter_message(args []*ic.Argument) (*empty.Empty, error) {
+	if len(args) != 1 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+	iaMsg := &ic.InterAdapterMessage{}
+	if err := ptypes.UnmarshalAny(args[0].Value, iaMsg); err != nil {
+		log.Warnw("cannot-unmarshal-message", log.Fields{"error": err})
+		return nil, err
+	}
+	log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": iaMsg.Header.Id})
+	//Invoke the inter adapter API on the handler
+	if err := rhp.adapter.Process_inter_adapter_message(iaMsg); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Download_image(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Get_image_download_status(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Cancel_image_download(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Activate_image_update(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Revert_image_update(args []*ic.Argument) (*empty.Empty, error) {
+	return new(empty.Empty), nil
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+import (
+	"fmt"
+	"math/rand"
+	"time"
+//GetRandomSerialNumber returns a serial number formatted as "HOST:PORT"
+func GetRandomSerialNumber() string {
+	rand.Seed(time.Now().Unix())
+	return fmt.Sprintf("%d.%d.%d.%d:%d",
+		rand.Intn(255),
+		rand.Intn(255),
+		rand.Intn(255),
+		rand.Intn(255),
+		rand.Intn(9000)+1000,
+	)
+//GetRandomMacAddress returns a random mac address
+func GetRandomMacAddress() string {
+	rand.Seed(time.Now().Unix())
+	return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x",
+		rand.Intn(128),
+		rand.Intn(128),
+		rand.Intn(128),
+		rand.Intn(128),
+		rand.Intn(128),
+		rand.Intn(128),
+	)
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adapters
+import (
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	"github.com/opencord/voltha-go/protos/openflow_13"
+	"github.com/opencord/voltha-go/protos/voltha"
+//IAdapter represents the set of APIs a voltha adapter has to support.
+type IAdapter interface {
+	Adapter_descriptor() error
+	Device_types() (*voltha.DeviceTypes, error)
+	Health() (*voltha.HealthStatus, error)
+	Adopt_device(device *voltha.Device) error
+	Reconcile_device(device *voltha.Device) error
+	Abandon_device(device *voltha.Device) error
+	Disable_device(device *voltha.Device) error
+	Reenable_device(device *voltha.Device) error
+	Reboot_device(device *voltha.Device) error
+	Self_test_device(device *voltha.Device) error
+	Gelete_device(device *voltha.Device) error
+	Get_device_details(device *voltha.Device) error
+	Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error
+	Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error
+	Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error
+	Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error
+	Suppress_alarm(filter *voltha.AlarmFilter) error
+	Unsuppress_alarm(filter *voltha.AlarmFilter) error
+	Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error)
+	Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error)
+	Process_inter_adapter_message(msg *ic.InterAdapterMessage) error
+	Download_image(device *voltha.Device, request *voltha.ImageDownload) error
+	Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) error
+	Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) error
+	Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) error
+	Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) error
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+import (
+	"context"
+	"github.com/gogo/protobuf/proto"
+	com "github.com/opencord/voltha-go/adapters/common"
+	"github.com/opencord/voltha-go/common/log"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	of "github.com/opencord/voltha-go/protos/openflow_13"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"strconv"
+	"strings"
+	"sync"
+//DeviceHandler follows the same patterns as ponsim_olt.  The only difference is that it does not
+// interact with an OLT device.
+type DeviceHandler struct {
+	deviceId     string
+	deviceType   string
+	device       *voltha.Device
+	coreProxy    *com.CoreProxy
+	simulatedOLT *SimulatedOLT
+	nniPort      *voltha.Port
+	ponPort      *voltha.Port
+	exitChannel  chan int
+	lockDevice   sync.RWMutex
+//NewDeviceHandler creates a new device handler
+func NewDeviceHandler(cp *com.CoreProxy, device *voltha.Device, adapter *SimulatedOLT) *DeviceHandler {
+	var dh DeviceHandler
+	dh.coreProxy = cp
+	cloned := (proto.Clone(device)).(*voltha.Device)
+	dh.deviceId = cloned.Id
+	dh.deviceType = cloned.Type
+	dh.device = cloned
+	dh.simulatedOLT = adapter
+	dh.exitChannel = make(chan int, 1)
+	dh.lockDevice = sync.RWMutex{}
+	return &dh
+// start save the device to the data model
+func (dh *DeviceHandler) start(ctx context.Context) {
+	dh.lockDevice.Lock()
+	defer dh.lockDevice.Unlock()
+	log.Debugw("starting-device-agent", log.Fields{"device": dh.device})
+	// Add the initial device to the local model
+	log.Debug("device-agent-started")
+// stop stops the device dh.  Not much to do for now
+func (dh *DeviceHandler) stop(ctx context.Context) {
+	dh.lockDevice.Lock()
+	defer dh.lockDevice.Unlock()
+	log.Debug("stopping-device-agent")
+	dh.exitChannel <- 1
+	log.Debug("device-agent-stopped")
+func macAddressToUint32Array(mac string) []uint32 {
+	slist := strings.Split(mac, ":")
+	result := make([]uint32, len(slist))
+	var err error
+	var tmp int64
+	for index, val := range slist {
+		if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
+			return []uint32{1, 2, 3, 4, 5, 6}
+		}
+		result[index] = uint32(tmp)
+	}
+	return result
+func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
+	log.Debugw("AdoptDevice", log.Fields{"deviceId": device.Id})
+	//	Update the device info
+	cloned := proto.Clone(device).(*voltha.Device)
+	cloned.Root = true
+	cloned.Vendor = "simulators"
+	cloned.Model = "go-simulators"
+	cloned.SerialNumber = com.GetRandomSerialNumber()
+	cloned.MacAddress = strings.ToUpper(com.GetRandomMacAddress())
+	// Synchronous call to update device - this method is run in its own go routine
+	if err := dh.coreProxy.DeviceUpdate(nil, cloned); err != nil {
+		log.Errorw("error-updating-device", log.Fields{"deviceId": device.Id, "error": err})
+	}
+	//	Now create the NNI Port
+	dh.nniPort = &voltha.Port{
+		PortNo:     2,
+		Label:      "NNI facing Ethernet port",
+		Type:       voltha.Port_ETHERNET_NNI,
+		OperStatus: voltha.OperStatus_ACTIVE,
+	}
+	// Synchronous call to update device - this method is run in its own go routine
+	if err := dh.coreProxy.PortCreated(nil, cloned.Id, dh.nniPort); err != nil {
+		log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+	}
+	//	Now create the PON Port
+	dh.ponPort = &voltha.Port{
+		PortNo:     1,
+		Label:      "PON port",
+		Type:       voltha.Port_PON_OLT,
+		OperStatus: voltha.OperStatus_ACTIVE,
+	}
+	// Synchronous call to update device - this method is run in its own go routine
+	if err := dh.coreProxy.PortCreated(nil, cloned.Id, dh.ponPort); err != nil {
+		log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+	}
+	cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
+	cloned.OperStatus = voltha.OperStatus_ACTIVE
+	//	Update the device state
+	if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+		log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+	}
+	//	Register Child device
+	initialUniPortNo := 100
+	log.Debugw("registering-onus", log.Fields{"total": dh.simulatedOLT.numOnus})
+	for i := 0; i < dh.simulatedOLT.numOnus; i++ {
+		go dh.coreProxy.ChildDeviceDetected(
+			nil,
+			cloned.Id,
+			1,
+			"simulated_onu",
+			initialUniPortNo+i)
+	}
+	dh.device = cloned
+func (dh *DeviceHandler) GetOfpDeviceInfo(device *voltha.Device) (*ic.SwitchCapability, error) {
+	return &ic.SwitchCapability{
+		Desc: &of.OfpDesc{
+			HwDesc:    "simulated_pon",
+			SwDesc:    "simulated_pon",
+			SerialNum: dh.device.SerialNumber,
+		},
+		SwitchFeatures: &of.OfpSwitchFeatures{
+			NBuffers: 256,
+			NTables:  2,
+			Capabilities: uint32(of.OfpCapabilities_OFPC_FLOW_STATS |
+				of.OfpCapabilities_OFPC_TABLE_STATS |
+				of.OfpCapabilities_OFPC_PORT_STATS |
+				of.OfpCapabilities_OFPC_GROUP_STATS),
+		},
+	}, nil
+func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device, portNo int64) (*ic.PortCapability, error) {
+	cap := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
+	return &ic.PortCapability{
+		Port: &voltha.LogicalPort{
+			OfpPort: &of.OfpPort{
+				HwAddr:     macAddressToUint32Array(dh.device.MacAddress),
+				Config:     0,
+				State:      uint32(of.OfpPortState_OFPPS_LIVE),
+				Curr:       cap,
+				Advertised: cap,
+				Peer:       cap,
+				CurrSpeed:  uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+				MaxSpeed:   uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+			},
+			DeviceId:     dh.device.Id,
+			DevicePortNo: uint32(portNo),
+		},
+	}, nil
+func (dh *DeviceHandler) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+	log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+	return nil
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+import (
+	"context"
+	"errors"
+	"fmt"
+	com "github.com/opencord/voltha-go/adapters/common"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/kafka"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	"github.com/opencord/voltha-go/protos/openflow_13"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"sync"
+type SimulatedOLT struct {
+	deviceHandlers        map[string]*DeviceHandler
+	coreProxy             *com.CoreProxy
+	kafkaICProxy          *kafka.InterContainerProxy
+	numOnus               int
+	exitChannel           chan int
+	lockDeviceHandlersMap sync.RWMutex
+func NewSimulatedOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy, onuNumber int) *SimulatedOLT {
+	var simulatedOLT SimulatedOLT
+	simulatedOLT.exitChannel = make(chan int, 1)
+	simulatedOLT.deviceHandlers = make(map[string]*DeviceHandler)
+	simulatedOLT.kafkaICProxy = kafkaICProxy
+	simulatedOLT.numOnus = onuNumber
+	simulatedOLT.coreProxy = coreProxy
+	simulatedOLT.lockDeviceHandlersMap = sync.RWMutex{}
+	return &simulatedOLT
+func (so *SimulatedOLT) Start(ctx context.Context) error {
+	log.Info("starting-device-manager")
+	log.Info("device-manager-started")
+	return nil
+func (so *SimulatedOLT) Stop(ctx context.Context) error {
+	log.Info("stopping-device-manager")
+	so.exitChannel <- 1
+	log.Info("device-manager-stopped")
+	return nil
+func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
+	if ctx.Err() == nil {
+		// Returned response only of the ctx has not been cancelled/timeout/etc
+		// Channel is automatically closed when a context is Done
+		ch <- result
+		log.Debugw("sendResponse", log.Fields{"result": result})
+	} else {
+		// Should the transaction be reverted back?
+		log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
+	}
+func (so *SimulatedOLT) addDeviceHandlerToMap(agent *DeviceHandler) {
+	so.lockDeviceHandlersMap.Lock()
+	defer so.lockDeviceHandlersMap.Unlock()
+	if _, exist := so.deviceHandlers[agent.deviceId]; !exist {
+		so.deviceHandlers[agent.deviceId] = agent
+	}
+func (so *SimulatedOLT) deleteDeviceHandlerToMap(agent *DeviceHandler) {
+	so.lockDeviceHandlersMap.Lock()
+	defer so.lockDeviceHandlersMap.Unlock()
+	delete(so.deviceHandlers, agent.deviceId)
+func (so *SimulatedOLT) getDeviceHandler(deviceId string) *DeviceHandler {
+	so.lockDeviceHandlersMap.Lock()
+	defer so.lockDeviceHandlersMap.Unlock()
+	if agent, ok := so.deviceHandlers[deviceId]; ok {
+		return agent
+	}
+	return nil
+func (so *SimulatedOLT) createDeviceTopic(device *voltha.Device) error {
+	log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
+	deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
+	if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic); err != nil {
+		log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
+		return err
+	}
+	return nil
+func (so *SimulatedOLT) Adopt_device(device *voltha.Device) error {
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Infow("adopt-device", log.Fields{"deviceId": device.Id})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler == nil {
+		handler := NewDeviceHandler(so.coreProxy, device, so)
+		so.addDeviceHandlerToMap(handler)
+		go handler.AdoptDevice(device)
+		// Launch the creation of the device topic
+		go so.createDeviceTopic(device)
+	}
+	return nil
+func (so *SimulatedOLT) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {
+	log.Infow("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
+	if handler := so.getDeviceHandler(device.Id); handler != nil {
+		return handler.GetOfpDeviceInfo(device)
+	}
+	log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+	return nil, errors.New("device-handler-not-set")
+func (so *SimulatedOLT) Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error) {
+	log.Infow("Get_ofp_port_info", log.Fields{"deviceId": device.Id})
+	if handler := so.getDeviceHandler(device.Id); handler != nil {
+		return handler.GetOfpPortInfo(device, port_no)
+	}
+	log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+	return nil, errors.New("device-handler-not-set")
+func (so *SimulatedOLT) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+	log.Infow("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+	targetDevice := msg.Header.ProxyDeviceId // Request?
+	if targetDevice == "" && msg.Header.ToDeviceId != "" {
+		// Typical response
+		targetDevice = msg.Header.ToDeviceId
+	}
+	if handler := so.getDeviceHandler(targetDevice); handler != nil {
+		return handler.Process_inter_adapter_message(msg)
+	}
+	return errors.New(fmt.Sprintf("handler-not-found-%s", targetDevice))
+func (so *SimulatedOLT) Adapter_descriptor() error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Device_types() (*voltha.DeviceTypes, error) {
+	return nil, errors.New("UnImplemented")
+func (so *SimulatedOLT) Health() (*voltha.HealthStatus, error) {
+	return nil, errors.New("UnImplemented")
+func (so *SimulatedOLT) Reconcile_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Abandon_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Disable_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Reenable_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Reboot_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Self_test_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Gelete_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Get_device_details(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Suppress_alarm(filter *voltha.AlarmFilter) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Unsuppress_alarm(filter *voltha.AlarmFilter) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Download_image(device *voltha.Device, request *voltha.ImageDownload) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
+	return errors.New("UnImplemented")
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package config
+import (
+	"flag"
+	"fmt"
+	"github.com/opencord/voltha-go/common/log"
+	"os"
+// Simulated OLT default constants
+const (
+	EtcdStoreName            = "etcd"
+	default_InstanceID       = "simulatedOlt001"
+	default_KafkaAdapterHost = ""
+	default_KafkaAdapterPort = 9092
+	default_KafkaClusterHost = ""
+	default_KafkaClusterPort = 9094
+	default_KVStoreType      = EtcdStoreName
+	default_KVStoreTimeout   = 5 //in seconds
+	default_KVStoreHost      = ""
+	default_KVStorePort      = 2379 // Consul = 8500; Etcd = 2379
+	default_LogLevel         = 0
+	default_Banner           = false
+	default_Topic            = "simulated_olt"
+	default_CoreTopic        = "rwcore"
+	default_OnuNumber        = 1
+// AdapterFlags represents the set of configurations used by the read-write adaptercore service
+type AdapterFlags struct {
+	// Command line parameters
+	InstanceID       string
+	KafkaAdapterHost string
+	KafkaAdapterPort int
+	KafkaClusterHost string
+	KafkaClusterPort int
+	KVStoreType      string
+	KVStoreTimeout   int // in seconds
+	KVStoreHost      string
+	KVStorePort      int
+	Topic            string
+	CoreTopic        string
+	LogLevel         int
+	OnuNumber        int
+	Banner           bool
+func init() {
+	log.AddPackage(log.JSON, log.WarnLevel, nil)
+// NewRWCoreFlags returns a new RWCore config
+func NewAdapterFlags() *AdapterFlags {
+	var adapterFlags = AdapterFlags{ // Default values
+		InstanceID:       default_InstanceID,
+		KafkaAdapterHost: default_KafkaAdapterHost,
+		KafkaAdapterPort: default_KafkaAdapterPort,
+		KafkaClusterHost: default_KafkaClusterHost,
+		KafkaClusterPort: default_KafkaClusterPort,
+		KVStoreType:      default_KVStoreType,
+		KVStoreTimeout:   default_KVStoreTimeout,
+		KVStoreHost:      default_KVStoreHost,
+		KVStorePort:      default_KVStorePort,
+		Topic:            default_Topic,
+		CoreTopic:        default_CoreTopic,
+		LogLevel:         default_LogLevel,
+		OnuNumber:        default_OnuNumber,
+		Banner:           default_Banner,
+	}
+	return &adapterFlags
+// ParseCommandArguments parses the arguments when running read-write adaptercore service
+func (so *AdapterFlags) ParseCommandArguments() {
+	var help string
+	help = fmt.Sprintf("Kafka - Adapter messaging host")
+	flag.StringVar(&(so.KafkaAdapterHost), "kafka_adapter_host", default_KafkaAdapterHost, help)
+	help = fmt.Sprintf("Kafka - Adapter messaging port")
+	flag.IntVar(&(so.KafkaAdapterPort), "kafka_adapter_port", default_KafkaAdapterPort, help)
+	help = fmt.Sprintf("Kafka - Cluster messaging host")
+	flag.StringVar(&(so.KafkaClusterHost), "kafka_cluster_host", default_KafkaClusterHost, help)
+	help = fmt.Sprintf("Kafka - Cluster messaging port")
+	flag.IntVar(&(so.KafkaClusterPort), "kafka_cluster_port", default_KafkaClusterPort, help)
+	help = fmt.Sprintf("Simulated OLT topic")
+	flag.StringVar(&(so.Topic), "simulator_topic", default_Topic, help)
+	help = fmt.Sprintf("Core topic")
+	flag.StringVar(&(so.CoreTopic), "core_topic", default_CoreTopic, help)
+	help = fmt.Sprintf("KV store type")
+	flag.StringVar(&(so.KVStoreType), "kv_store_type", default_KVStoreType, help)
+	help = fmt.Sprintf("The default timeout when making a kv store request")
+	flag.IntVar(&(so.KVStoreTimeout), "kv_store_request_timeout", default_KVStoreTimeout, help)
+	help = fmt.Sprintf("KV store host")
+	flag.StringVar(&(so.KVStoreHost), "kv_store_host", default_KVStoreHost, help)
+	help = fmt.Sprintf("KV store port")
+	flag.IntVar(&(so.KVStorePort), "kv_store_port", default_KVStorePort, help)
+	help = fmt.Sprintf("Log level")
+	flag.IntVar(&(so.LogLevel), "log_level", default_LogLevel, help)
+	help = fmt.Sprintf("Number of ONUs")
+	flag.IntVar(&(so.OnuNumber), "onu_number", default_OnuNumber, help)
+	help = fmt.Sprintf("Show startup banner log lines")
+	flag.BoolVar(&so.Banner, "banner", default_Banner, help)
+	flag.Parse()
+	containerName := getContainerInfo()
+	if len(containerName) > 0 {
+		so.InstanceID = containerName
+	}
+func getContainerInfo() string {
+	return os.Getenv("HOSTNAME")
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+import (
+	"context"
+	"errors"
+	"fmt"
+	"github.com/opencord/voltha-go/adapters"
+	com "github.com/opencord/voltha-go/adapters/common"
+	ac "github.com/opencord/voltha-go/adapters/simulated_olt/adaptercore"
+	"github.com/opencord/voltha-go/adapters/simulated_olt/config"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/db/kvstore"
+	"github.com/opencord/voltha-go/kafka"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"os"
+	"os/signal"
+	"strconv"
+	"syscall"
+	"time"
+type adapter struct {
+	instanceId       string
+	config           *config.AdapterFlags
+	iAdapter         adapters.IAdapter
+	kafkaClient      kafka.Client
+	kvClient         kvstore.Client
+	kip              *kafka.InterContainerProxy
+	coreProxy        *com.CoreProxy
+	halted           bool
+	exitChannel      chan int
+	receiverChannels []<-chan *ic.InterContainerMessage
+func init() {
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
+func newAdapter(cf *config.AdapterFlags) *adapter {
+	var a adapter
+	a.instanceId = cf.InstanceID
+	a.config = cf
+	a.halted = false
+	a.exitChannel = make(chan int, 1)
+	a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
+	return &a
+func (a *adapter) start(ctx context.Context) {
+	log.Info("Starting Core Adapter components")
+	var err error
+	// Setup KV Client
+	log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
+	if err := a.setKVClient(); err != nil {
+		log.Fatal("error-setting-kv-client")
+	}
+	// Setup Kafka Client
+	if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
+		log.Fatal("Unsupported-common-client")
+	}
+	// Start the common InterContainer Proxy - retries indefinitely
+	if a.kip, err = a.startInterContainerProxy(-1); err != nil {
+		log.Fatal("error-starting-inter-container-proxy")
+	}
+	// Create the core proxy to handle requests to the Core
+	a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
+	// Create the simulated OLT adapter
+	if a.iAdapter, err = a.startSimulatedOLT(ctx, a.kip, a.coreProxy, a.config.OnuNumber); err != nil {
+		log.Fatal("error-starting-inter-container-proxy")
+	}
+	// Register the core request handler
+	if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+		log.Fatal("error-setting-core-request-handler")
+	}
+	//	Register this adapter to the Core - retries indefinitely
+	if err = a.registerWithCore(-1); err != nil {
+		log.Fatal("error-registering-with-core")
+	}
+func (rw *adapter) stop() {
+	// Stop leadership tracking
+	rw.halted = true
+	// send exit signal
+	rw.exitChannel <- 0
+	// Cleanup - applies only if we had a kvClient
+	if rw.kvClient != nil {
+		// Release all reservations
+		if err := rw.kvClient.ReleaseAllReservations(); err != nil {
+			log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
+		}
+		// Close the DB connection
+		rw.kvClient.Close()
+	}
+	// TODO:  More cleanup
+func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
+	log.Infow("kv-store-type", log.Fields{"store": storeType})
+	switch storeType {
+	case "consul":
+		return kvstore.NewConsulClient(address, timeout)
+	case "etcd":
+		return kvstore.NewEtcdClient(address, timeout)
+	}
+	return nil, errors.New("unsupported-kv-store")
+func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+	log.Infow("common-client-type", log.Fields{"client": clientType})
+	switch clientType {
+	case "sarama":
+		return kafka.NewSaramaClient(
+			kafka.Host(host),
+			kafka.Port(port),
+			kafka.ProducerReturnOnErrors(true),
+			kafka.ProducerReturnOnSuccess(true),
+			kafka.ProducerMaxRetries(6),
+			kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+	}
+	return nil, errors.New("unsupported-client-type")
+func (a *adapter) setKVClient() error {
+	addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
+	client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
+	if err != nil {
+		a.kvClient = nil
+		log.Error(err)
+		return err
+	}
+	a.kvClient = client
+	return nil
+func toString(value interface{}) (string, error) {
+	switch t := value.(type) {
+	case []byte:
+		return string(value.([]byte)), nil
+	case string:
+		return value.(string), nil
+	default:
+		return "", fmt.Errorf("unexpected-type-%T", t)
+	}
+func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
+	log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
+		"port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
+	var err error
+	var kip *kafka.InterContainerProxy
+	if kip, err = kafka.NewInterContainerProxy(
+		kafka.InterContainerHost(a.config.KafkaAdapterHost),
+		kafka.InterContainerPort(a.config.KafkaAdapterPort),
+		kafka.MsgClient(a.kafkaClient),
+		kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
+		log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
+		return nil, err
+	}
+	count := 0
+	for {
+		if err = kip.Start(); err != nil {
+			log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
+			if retries == count {
+				return nil, err
+			}
+			count = +1
+			//	Take a nap before retrying
+			time.Sleep(2 * time.Second)
+		} else {
+			break
+		}
+	}
+	log.Info("common-messaging-proxy-created")
+	return kip, nil
+func (a *adapter) startSimulatedOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, onuNumber int) (*ac.SimulatedOLT, error) {
+	log.Info("starting-simulated-olt")
+	var err error
+	sOLT := ac.NewSimulatedOLT(ctx, a.kip, cp, onuNumber)
+	if err = sOLT.Start(ctx); err != nil {
+		log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
+		return nil, err
+	}
+	log.Info("simulated-olt-started")
+	return sOLT, nil
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+	log.Info("setting-request-handler")
+	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+	if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
+		log.Errorw("request-handler-setup-failed", log.Fields{"error": err})
+		return err
+	}
+	log.Info("request-handler-setup-done")
+	return nil
+func (a *adapter) registerWithCore(retries int) error {
+	log.Info("registering-with-core")
+	adapterDescription := &voltha.Adapter{Id: "simulated_olt", Vendor: "simulation Enterprise Inc"}
+	types := []*voltha.DeviceType{{Id: "simulated_olt", Adapter: "simulated_olt"}}
+	deviceTypes := &voltha.DeviceTypes{Items: types}
+	count := 0
+	for {
+		if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {
+			log.Warnw("registering-with-core-failed", log.Fields{"error": err})
+			if retries == count {
+				return err
+			}
+			count += 1
+			//	Take a nap before retrying
+			time.Sleep(2 * time.Second)
+		} else {
+			break
+		}
+	}
+	log.Info("registered-with-core")
+	return nil
+func waitForExit() int {
+	signalChannel := make(chan os.Signal, 1)
+	signal.Notify(signalChannel,
+		syscall.SIGHUP,
+		syscall.SIGINT,
+		syscall.SIGTERM,
+		syscall.SIGQUIT)
+	exitChannel := make(chan int)
+	go func() {
+		s := <-signalChannel
+		switch s {
+		case syscall.SIGHUP,
+			syscall.SIGINT,
+			syscall.SIGTERM,
+			syscall.SIGQUIT:
+			log.Infow("closing-signal-received", log.Fields{"signal": s})
+			exitChannel <- 0
+		default:
+			log.Infow("unexpected-signal-received", log.Fields{"signal": s})
+			exitChannel <- 1
+		}
+	}()
+	code := <-exitChannel
+	return code
+func printBanner() {
+	fmt.Println(" ____  _                 _       _           _  ___  _   _____		")
+	fmt.Println("/ ___|(_)_ __ ___  _   _| | __ _| |_ ___  __| |/ _ \\| | |_   _|	")
+	fmt.Println("\\___ \\| | '_ ` _ \\| | | | |/ _` | __/ _ \\/ _` | | | | |   | |	")
+	fmt.Println(" ___) | | | | | | | |_| | | (_| | ||  __/ (_| | |_| | |___| |		")
+	fmt.Println("|____/|_|_| |_| |_|\\__,_|_|\\__,_|\\__\\___|\\__,_|\\___/|_____|_|	")
+	fmt.Println("																	")
+func main() {
+	start := time.Now()
+	cf := config.NewAdapterFlags()
+	cf.ParseCommandArguments()
+	//// Setup logging
+	//Setup default logger - applies for packages that do not have specific logger set
+	if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	// Update all loggers (provisionned via init) with a common field
+	if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel)
+	defer log.CleanUp()
+	// Print banner if specified
+	if cf.Banner {
+		printBanner()
+	}
+	log.Infow("config", log.Fields{"config": *cf})
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	ad := newAdapter(cf)
+	go ad.start(ctx)
+	code := waitForExit()
+	log.Infow("received-a-closing-signal", log.Fields{"code": code})
+	// Cleanup before leaving
+	ad.stop()
+	elapsed := time.Since(start)
+	log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+import (
+	"context"
+	"github.com/gogo/protobuf/proto"
+	com "github.com/opencord/voltha-go/adapters/common"
+	"github.com/opencord/voltha-go/common/log"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	of "github.com/opencord/voltha-go/protos/openflow_13"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"strconv"
+	"strings"
+	"sync"
+//DeviceHandler follows the same patterns as ponsim_olt.  The only difference is that it does not
+// interact with an OLT device.
+type DeviceHandler struct {
+	deviceId     string
+	deviceType   string
+	device       *voltha.Device
+	coreProxy    *com.CoreProxy
+	simulatedOLT *SimulatedONU
+	uniPort      *voltha.Port
+	ponPort      *voltha.Port
+	exitChannel  chan int
+	lockDevice   sync.RWMutex
+//NewDeviceHandler creates a new device handler
+func NewDeviceHandler(cp *com.CoreProxy, device *voltha.Device, adapter *SimulatedONU) *DeviceHandler {
+	var dh DeviceHandler
+	dh.coreProxy = cp
+	cloned := (proto.Clone(device)).(*voltha.Device)
+	dh.deviceId = cloned.Id
+	dh.deviceType = cloned.Type
+	dh.device = cloned
+	dh.simulatedOLT = adapter
+	dh.exitChannel = make(chan int, 1)
+	dh.lockDevice = sync.RWMutex{}
+	return &dh
+// start save the device to the data model
+func (dh *DeviceHandler) start(ctx context.Context) {
+	dh.lockDevice.Lock()
+	defer dh.lockDevice.Unlock()
+	log.Debugw("starting-device-agent", log.Fields{"device": dh.device})
+	// Add the initial device to the local model
+	log.Debug("device-agent-started")
+// stop stops the device dh.  Not much to do for now
+func (dh *DeviceHandler) stop(ctx context.Context) {
+	dh.lockDevice.Lock()
+	defer dh.lockDevice.Unlock()
+	log.Debug("stopping-device-agent")
+	dh.exitChannel <- 1
+	log.Debug("device-agent-stopped")
+func macAddressToUint32Array(mac string) []uint32 {
+	slist := strings.Split(mac, ":")
+	result := make([]uint32, len(slist))
+	var err error
+	var tmp int64
+	for index, val := range slist {
+		if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
+			return []uint32{1, 2, 3, 4, 5, 6}
+		}
+		result[index] = uint32(tmp)
+	}
+	return result
+func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
+	log.Debugw("AdoptDevice", log.Fields{"deviceId": device.Id})
+	//	Update the device info
+	cloned := proto.Clone(device).(*voltha.Device)
+	cloned.Root = false
+	cloned.Vendor = "simulators"
+	cloned.Model = "go-simulators"
+	cloned.SerialNumber = com.GetRandomSerialNumber()
+	cloned.MacAddress = strings.ToUpper(com.GetRandomMacAddress())
+	// Synchronous call to update device - this method is run in its own go routine
+	if err := dh.coreProxy.DeviceUpdate(nil, cloned); err != nil {
+		log.Errorw("error-updating-device", log.Fields{"deviceId": device.Id, "error": err})
+	}
+	//	Now create the NNI Port
+	dh.uniPort = &voltha.Port{
+		PortNo:     2,
+		Label:      "UNI facing Ethernet port",
+		Type:       voltha.Port_ETHERNET_UNI,
+		AdminState: voltha.AdminState_ENABLED,
+		OperStatus: voltha.OperStatus_ACTIVE,
+	}
+	// Synchronous call to update device - this method is run in its own go routine
+	if err := dh.coreProxy.PortCreated(nil, cloned.Id, dh.uniPort); err != nil {
+		log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+	}
+	//	Now create the PON Port
+	dh.ponPort = &voltha.Port{
+		PortNo:     1,
+		Label:      "PON port",
+		Type:       voltha.Port_PON_ONU,
+		AdminState: voltha.AdminState_ENABLED,
+		OperStatus: voltha.OperStatus_ACTIVE,
+		Peers: []*voltha.Port_PeerPort{{DeviceId: cloned.ParentId,
+			PortNo: cloned.ParentPortNo}},
+	}
+	// Synchronous call to update device - this method is run in its own go routine
+	if err := dh.coreProxy.PortCreated(nil, cloned.Id, dh.ponPort); err != nil {
+		log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+	}
+	cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
+	cloned.OperStatus = voltha.OperStatus_ACTIVE
+	//	Update the device state
+	if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+		log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
+	}
+	dh.device = cloned
+func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device, portNo int64) (*ic.PortCapability, error) {
+	cap := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
+	return &ic.PortCapability{
+		Port: &voltha.LogicalPort{
+			OfpPort: &of.OfpPort{
+				HwAddr:     macAddressToUint32Array(dh.device.MacAddress),
+				Config:     0,
+				State:      uint32(of.OfpPortState_OFPPS_LIVE),
+				Curr:       cap,
+				Advertised: cap,
+				Peer:       cap,
+				CurrSpeed:  uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+				MaxSpeed:   uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+			},
+			DeviceId:     dh.device.Id,
+			DevicePortNo: uint32(portNo),
+		},
+	}, nil
+func (dh *DeviceHandler) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+	log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+	return nil
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+import (
+	"context"
+	"errors"
+	"fmt"
+	com "github.com/opencord/voltha-go/adapters/common"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/kafka"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	"github.com/opencord/voltha-go/protos/openflow_13"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"sync"
+type SimulatedONU struct {
+	deviceHandlers        map[string]*DeviceHandler
+	coreProxy             *com.CoreProxy
+	kafkaICProxy          *kafka.InterContainerProxy
+	exitChannel           chan int
+	lockDeviceHandlersMap sync.RWMutex
+func NewSimulatedONU(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy) *SimulatedONU {
+	var simulatedOLT SimulatedONU
+	simulatedOLT.exitChannel = make(chan int, 1)
+	simulatedOLT.deviceHandlers = make(map[string]*DeviceHandler)
+	simulatedOLT.kafkaICProxy = kafkaICProxy
+	simulatedOLT.coreProxy = coreProxy
+	simulatedOLT.lockDeviceHandlersMap = sync.RWMutex{}
+	return &simulatedOLT
+func (so *SimulatedONU) Start(ctx context.Context) error {
+	log.Info("starting-device-manager")
+	log.Info("device-manager-started")
+	return nil
+func (so *SimulatedONU) Stop(ctx context.Context) error {
+	log.Info("stopping-device-manager")
+	so.exitChannel <- 1
+	log.Info("device-manager-stopped")
+	return nil
+func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
+	if ctx.Err() == nil {
+		// Returned response only of the ctx has not been cancelled/timeout/etc
+		// Channel is automatically closed when a context is Done
+		ch <- result
+		log.Debugw("sendResponse", log.Fields{"result": result})
+	} else {
+		// Should the transaction be reverted back?
+		log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
+	}
+func (so *SimulatedONU) addDeviceHandlerToMap(agent *DeviceHandler) {
+	so.lockDeviceHandlersMap.Lock()
+	defer so.lockDeviceHandlersMap.Unlock()
+	if _, exist := so.deviceHandlers[agent.deviceId]; !exist {
+		so.deviceHandlers[agent.deviceId] = agent
+	}
+func (so *SimulatedONU) deleteDeviceHandlerToMap(agent *DeviceHandler) {
+	so.lockDeviceHandlersMap.Lock()
+	defer so.lockDeviceHandlersMap.Unlock()
+	delete(so.deviceHandlers, agent.deviceId)
+func (so *SimulatedONU) getDeviceHandler(deviceId string) *DeviceHandler {
+	so.lockDeviceHandlersMap.Lock()
+	defer so.lockDeviceHandlersMap.Unlock()
+	if agent, ok := so.deviceHandlers[deviceId]; ok {
+		return agent
+	}
+	return nil
+func (so *SimulatedONU) createDeviceTopic(device *voltha.Device) error {
+	log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
+	deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
+	if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic); err != nil {
+		log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
+		return err
+	}
+	return nil
+func (so *SimulatedONU) Adopt_device(device *voltha.Device) error {
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Infow("adopt-device", log.Fields{"deviceId": device.Id})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler == nil {
+		handler := NewDeviceHandler(so.coreProxy, device, so)
+		so.addDeviceHandlerToMap(handler)
+		go handler.AdoptDevice(device)
+		// Launch the creation of the device topic
+		go so.createDeviceTopic(device)
+	}
+	return nil
+func (so *SimulatedONU) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {
+	log.Infow("not-implemented-for-onu", log.Fields{"deviceId": device.Id})
+	return nil, nil
+func (so *SimulatedONU) Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error) {
+	log.Infow("Get_ofp_port_info", log.Fields{"deviceId": device.Id})
+	if handler := so.getDeviceHandler(device.Id); handler != nil {
+		return handler.GetOfpPortInfo(device, port_no)
+	}
+	log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+	return nil, errors.New("device-handler-not-set")
+func (so *SimulatedONU) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+	log.Infow("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+	targetDevice := msg.Header.ProxyDeviceId // Request?
+	if targetDevice == "" && msg.Header.ToDeviceId != "" {
+		// Typical response
+		targetDevice = msg.Header.ToDeviceId
+	}
+	if handler := so.getDeviceHandler(targetDevice); handler != nil {
+		return handler.Process_inter_adapter_message(msg)
+	}
+	return errors.New(fmt.Sprintf("handler-not-found-%s", targetDevice))
+func (so *SimulatedONU) Adapter_descriptor() error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Device_types() (*voltha.DeviceTypes, error) {
+	return nil, errors.New("UnImplemented")
+func (so *SimulatedONU) Health() (*voltha.HealthStatus, error) {
+	return nil, errors.New("UnImplemented")
+func (so *SimulatedONU) Reconcile_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Abandon_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Disable_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Reenable_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Reboot_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Self_test_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Gelete_device(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Get_device_details(device *voltha.Device) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Suppress_alarm(filter *voltha.AlarmFilter) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Unsuppress_alarm(filter *voltha.AlarmFilter) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Download_image(device *voltha.Device, request *voltha.ImageDownload) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
+	return errors.New("UnImplemented")
+func (so *SimulatedONU) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
+	return errors.New("UnImplemented")
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package config
+import (
+	"flag"
+	"fmt"
+	"github.com/opencord/voltha-go/common/log"
+	"os"
+// Simulated OLT default constants
+const (
+	EtcdStoreName            = "etcd"
+	default_InstanceID       = "simulatedOnu001"
+	default_KafkaAdapterHost = ""
+	default_KafkaAdapterPort = 9092
+	default_KafkaClusterHost = ""
+	default_KafkaClusterPort = 9094
+	default_KVStoreType      = EtcdStoreName
+	default_KVStoreTimeout   = 5 //in seconds
+	default_KVStoreHost      = ""
+	default_KVStorePort      = 2379 // Consul = 8500; Etcd = 2379
+	default_LogLevel         = 0
+	default_Banner           = false
+	default_Topic            = "simulated_onu"
+	default_CoreTopic        = "rwcore"
+// AdapterFlags represents the set of configurations used by the read-write adaptercore service
+type AdapterFlags struct {
+	// Command line parameters
+	InstanceID       string
+	KafkaAdapterHost string
+	KafkaAdapterPort int
+	KafkaClusterHost string
+	KafkaClusterPort int
+	KVStoreType      string
+	KVStoreTimeout   int // in seconds
+	KVStoreHost      string
+	KVStorePort      int
+	Topic            string
+	CoreTopic        string
+	LogLevel         int
+	Banner           bool
+func init() {
+	log.AddPackage(log.JSON, log.WarnLevel, nil)
+// NewRWCoreFlags returns a new RWCore config
+func NewAdapterFlags() *AdapterFlags {
+	var adapterFlags = AdapterFlags{ // Default values
+		InstanceID:       default_InstanceID,
+		KafkaAdapterHost: default_KafkaAdapterHost,
+		KafkaAdapterPort: default_KafkaAdapterPort,
+		KafkaClusterHost: default_KafkaClusterHost,
+		KafkaClusterPort: default_KafkaClusterPort,
+		KVStoreType:      default_KVStoreType,
+		KVStoreTimeout:   default_KVStoreTimeout,
+		KVStoreHost:      default_KVStoreHost,
+		KVStorePort:      default_KVStorePort,
+		Topic:            default_Topic,
+		CoreTopic:        default_CoreTopic,
+		LogLevel:         default_LogLevel,
+		Banner:           default_Banner,
+	}
+	return &adapterFlags
+// ParseCommandArguments parses the arguments when running read-write adaptercore service
+func (so *AdapterFlags) ParseCommandArguments() {
+	var help string
+	help = fmt.Sprintf("Kafka - Adapter messaging host")
+	flag.StringVar(&(so.KafkaAdapterHost), "kafka_adapter_host", default_KafkaAdapterHost, help)
+	help = fmt.Sprintf("Kafka - Adapter messaging port")
+	flag.IntVar(&(so.KafkaAdapterPort), "kafka_adapter_port", default_KafkaAdapterPort, help)
+	help = fmt.Sprintf("Kafka - Cluster messaging host")
+	flag.StringVar(&(so.KafkaClusterHost), "kafka_cluster_host", default_KafkaClusterHost, help)
+	help = fmt.Sprintf("Kafka - Cluster messaging port")
+	flag.IntVar(&(so.KafkaClusterPort), "kafka_cluster_port", default_KafkaClusterPort, help)
+	help = fmt.Sprintf("Simulated ONU topic")
+	flag.StringVar(&(so.Topic), "simulator_topic", default_Topic, help)
+	help = fmt.Sprintf("Core topic")
+	flag.StringVar(&(so.CoreTopic), "core_topic", default_CoreTopic, help)
+	help = fmt.Sprintf("KV store type")
+	flag.StringVar(&(so.KVStoreType), "kv_store_type", default_KVStoreType, help)
+	help = fmt.Sprintf("The default timeout when making a kv store request")
+	flag.IntVar(&(so.KVStoreTimeout), "kv_store_request_timeout", default_KVStoreTimeout, help)
+	help = fmt.Sprintf("KV store host")
+	flag.StringVar(&(so.KVStoreHost), "kv_store_host", default_KVStoreHost, help)
+	help = fmt.Sprintf("KV store port")
+	flag.IntVar(&(so.KVStorePort), "kv_store_port", default_KVStorePort, help)
+	help = fmt.Sprintf("Log level")
+	flag.IntVar(&(so.LogLevel), "log_level", default_LogLevel, help)
+	help = fmt.Sprintf("Show startup banner log lines")
+	flag.BoolVar(&so.Banner, "banner", default_Banner, help)
+	flag.Parse()
+	containerName := getContainerInfo()
+	if len(containerName) > 0 {
+		so.InstanceID = containerName
+	}
+func getContainerInfo() string {
+	return os.Getenv("HOSTNAME")
+ * Copyright 2018-present Open Networking Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+import (
+	"context"
+	"errors"
+	"fmt"
+	"github.com/opencord/voltha-go/adapters"
+	com "github.com/opencord/voltha-go/adapters/common"
+	ac "github.com/opencord/voltha-go/adapters/simulated_onu/adaptercore"
+	"github.com/opencord/voltha-go/adapters/simulated_onu/config"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/db/kvstore"
+	"github.com/opencord/voltha-go/kafka"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"os"
+	"os/signal"
+	"strconv"
+	"syscall"
+	"time"
+type adapter struct {
+	instanceId       string
+	config           *config.AdapterFlags
+	iAdapter         adapters.IAdapter
+	kafkaClient      kafka.Client
+	kvClient         kvstore.Client
+	kip              *kafka.InterContainerProxy
+	coreProxy        *com.CoreProxy
+	halted           bool
+	exitChannel      chan int
+	receiverChannels []<-chan *ic.InterContainerMessage
+func init() {
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
+func newAdapter(cf *config.AdapterFlags) *adapter {
+	var a adapter
+	a.instanceId = cf.InstanceID
+	a.config = cf
+	a.halted = false
+	a.exitChannel = make(chan int, 1)
+	a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
+	return &a
+func (a *adapter) start(ctx context.Context) {
+	log.Info("Starting Core Adapter components")
+	var err error
+	// Setup KV Client
+	log.Debugw("create-kv-client", log.Fields{"kvstore": a.config.KVStoreType})
+	if err := a.setKVClient(); err != nil {
+		log.Fatal("error-setting-kv-client")
+	}
+	// Setup Kafka Client
+	if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
+		log.Fatal("Unsupported-common-client")
+	}
+	// Start the common InterContainer Proxy - retry indefinitely
+	if a.kip, err = a.startInterContainerProxy(-1); err != nil {
+		log.Fatal("error-starting-inter-container-proxy")
+	}
+	// Create the core proxy to handle requests to the Core
+	a.coreProxy = com.NewCoreProxy(a.kip, a.config.Topic, a.config.CoreTopic)
+	// Create the simulated OLT adapter
+	if a.iAdapter, err = a.startSimulatedONU(ctx, a.kip, a.coreProxy); err != nil {
+		log.Fatal("error-starting-inter-container-proxy")
+	}
+	// Register the core request handler
+	if err = a.setupRequestHandler(a.instanceId, a.iAdapter); err != nil {
+		log.Fatal("error-setting-core-request-handler")
+	}
+	//	Register this adapter to the Core - retry indefinitely
+	if err = a.registerWithCore(-1); err != nil {
+		log.Fatal("error-registering-with-core")
+	}
+func (rw *adapter) stop() {
+	// Stop leadership tracking
+	rw.halted = true
+	// send exit signal
+	rw.exitChannel <- 0
+	// Cleanup - applies only if we had a kvClient
+	if rw.kvClient != nil {
+		// Release all reservations
+		if err := rw.kvClient.ReleaseAllReservations(); err != nil {
+			log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
+		}
+		// Close the DB connection
+		rw.kvClient.Close()
+	}
+	// TODO:  More cleanup
+func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
+	log.Infow("kv-store-type", log.Fields{"store": storeType})
+	switch storeType {
+	case "consul":
+		return kvstore.NewConsulClient(address, timeout)
+	case "etcd":
+		return kvstore.NewEtcdClient(address, timeout)
+	}
+	return nil, errors.New("unsupported-kv-store")
+func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+	log.Infow("common-client-type", log.Fields{"client": clientType})
+	switch clientType {
+	case "sarama":
+		return kafka.NewSaramaClient(
+			kafka.Host(host),
+			kafka.Port(port),
+			kafka.ProducerReturnOnErrors(true),
+			kafka.ProducerReturnOnSuccess(true),
+			kafka.ProducerMaxRetries(6),
+			kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
+	}
+	return nil, errors.New("unsupported-client-type")
+func (a *adapter) setKVClient() error {
+	addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
+	client, err := newKVClient(a.config.KVStoreType, addr, a.config.KVStoreTimeout)
+	if err != nil {
+		a.kvClient = nil
+		log.Error(err)
+		return err
+	}
+	a.kvClient = client
+	return nil
+func toString(value interface{}) (string, error) {
+	switch t := value.(type) {
+	case []byte:
+		return string(value.([]byte)), nil
+	case string:
+		return value.(string), nil
+	default:
+		return "", fmt.Errorf("unexpected-type-%T", t)
+	}
+func (a *adapter) startInterContainerProxy(retries int) (*kafka.InterContainerProxy, error) {
+	log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
+		"port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
+	var err error
+	var kip *kafka.InterContainerProxy
+	if kip, err = kafka.NewInterContainerProxy(
+		kafka.InterContainerHost(a.config.KafkaAdapterHost),
+		kafka.InterContainerPort(a.config.KafkaAdapterPort),
+		kafka.MsgClient(a.kafkaClient),
+		kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
+		log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
+		return nil, err
+	}
+	count := 0
+	for {
+		if err = kip.Start(); err != nil {
+			log.Warnw("error-starting-messaging-proxy", log.Fields{"error": err})
+			if retries == count {
+				return nil, err
+			}
+			count = +1
+			//	Take a nap before retrying
+			time.Sleep(2 * time.Second)
+		} else {
+			break
+		}
+	}
+	log.Info("common-messaging-proxy-created")
+	return kip, nil
+func (a *adapter) startSimulatedONU(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy) (*ac.SimulatedONU, error) {
+	log.Info("starting-simulated-onu")
+	var err error
+	sOLT := ac.NewSimulatedONU(ctx, a.kip, cp)
+	if err = sOLT.Start(ctx); err != nil {
+		log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
+		return nil, err
+	}
+	log.Info("simulated-olt-started")
+	return sOLT, nil
+func (a *adapter) setupRequestHandler(coreInstanceId string, iadapter adapters.IAdapter) error {
+	log.Info("setting-request-handler")
+	requestProxy := com.NewRequestHandlerProxy(coreInstanceId, iadapter)
+	if err := a.kip.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
+		log.Errorw("adaptercore-request-handler-setup-failed", log.Fields{"error": err})
+		return err
+	}
+	log.Info("request-handler-setup-done")
+	return nil
+func (a *adapter) registerWithCore(retries int) error {
+	log.Info("registering-with-core")
+	adapterDescription := &voltha.Adapter{Id: "simulated_onu", Vendor: "simulation Enterprise Inc"}
+	types := []*voltha.DeviceType{{Id: "simulated_onu", Adapter: "simulated_onu"}}
+	deviceTypes := &voltha.DeviceTypes{Items: types}
+	count := 0
+	for {
+		if err := a.coreProxy.RegisterAdapter(nil, adapterDescription, deviceTypes); err != nil {
+			log.Warnw("registering-with-core-failed", log.Fields{"error": err})
+			if retries == count {
+				return err
+			}
+			count += 1
+			//	Take a nap before retrying
+			time.Sleep(2 * time.Second)
+		} else {
+			break
+		}
+	}
+	log.Info("registered-with-core")
+	return nil
+func waitForExit() int {
+	signalChannel := make(chan os.Signal, 1)
+	signal.Notify(signalChannel,
+		syscall.SIGHUP,
+		syscall.SIGINT,
+		syscall.SIGTERM,
+		syscall.SIGQUIT)
+	exitChannel := make(chan int)
+	go func() {
+		s := <-signalChannel
+		switch s {
+		case syscall.SIGHUP,
+			syscall.SIGINT,
+			syscall.SIGTERM,
+			syscall.SIGQUIT:
+			log.Infow("closing-signal-received", log.Fields{"signal": s})
+			exitChannel <- 0
+		default:
+			log.Infow("unexpected-signal-received", log.Fields{"signal": s})
+			exitChannel <- 1
+		}
+	}()
+	code := <-exitChannel
+	return code
+func printBanner() {
+	fmt.Println("	 _                 _       _           _								")
+	fmt.Println(" ___(_)_ __ ___  _   _| | __ _| |_ ___  __| |    ___  _ __  _   _			")
+	fmt.Println("/ __| | '_ ` _ \\| | | | |/ _` | __/ _ \\/ _` |   / _ \\| '_ \\| | | |		")
+	fmt.Println("\\__ \\ | | | | | | |_| | | (_| | ||  __/ (_| |  | (_) | | | | |_| |		")
+	fmt.Println("|___/_|_| |_| |_|\\__,_|_|\\__,_|\\__\\___|\\__,_|___\\___/|_| |_|\\__,_|	")
+	fmt.Println("                                           |_____|							")
+	fmt.Println("																			")
+func main() {
+	start := time.Now()
+	cf := config.NewAdapterFlags()
+	cf.ParseCommandArguments()
+	//// Setup logging
+	//Setup default logger - applies for packages that do not have specific logger set
+	if _, err := log.SetDefaultLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	// Update all loggers (provisionned via init) with a common field
+	if err := log.UpdateAllLoggers(log.Fields{"instanceId": cf.InstanceID}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/adapters/common", log.DebugLevel)
+	defer log.CleanUp()
+	// Print banner if specified
+	if cf.Banner {
+		printBanner()
+	}
+	log.Infow("config", log.Fields{"config": *cf})
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	ad := newAdapter(cf)
+	go ad.start(ctx)
+	code := waitForExit()
+	log.Infow("received-a-closing-signal", log.Fields{"code": code})
+	// Cleanup before leaving
+	ad.stop()
+	elapsed := time.Since(start)
+	log.Infow("runtime", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})
+# Copyright 2018 the original author or authors.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#      http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+version: '2'
+  adapter_simulated_olt:
+    image: "${REGISTRY}${REPOSITORY}voltha-adapter-simulated-olt${TAG}"
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "10m"
+        max-file: "3"
+    command: [
+      "/app/simulated_olt",
+      "--kafka_adapter_host=${DOCKER_HOST_IP}",
+      "--kafka_adapter_port=9092",
+      "--kafka_cluster_host=${DOCKER_HOST_IP}",
+      "--kafka_cluster_port=9092",
+      "--core_topic=rwcore",
+      "--simulator_topic=simulated_olt",
+      "--onu_number=1"
+    ]
+    networks:
+    - default
+  adapter_simulated_onu:
+    image: "${REGISTRY}${REPOSITORY}voltha-adapter-simulated-onu${TAG}"
+    logging:
+      driver: "json-file"
+      options:
+        max-size: "10m"
+        max-file: "3"
+    command: [
+      "/app/simulated_onu",
+      "--kafka_adapter_host=${DOCKER_HOST_IP}",
+      "--kafka_adapter_port=9092",
+      "--kafka_cluster_host=${DOCKER_HOST_IP}",
+      "--kafka_cluster_port=9092",
+      "--core_topic=rwcore",
+      "--simulator_topic=simulated_onu",
+    ]
+    networks:
+    - default
+  default:
+    driver: bridge
+# -------------
+# Build stage
+FROM golang:1.9.2-alpine AS build-env
+# Install required packages
+RUN apk add --no-cache wget git make build-base protobuf protobuf-dev
+# Prepare directory structure
+RUN ["mkdir", "-p", "/src", "src/protos"]
+RUN ["mkdir", "-p", "$GOPATH/src", "$GOPATH/pkg", "$GOPATH/bin"]
+RUN ["mkdir", "-p", "$GOPATH/src/github.com/opencord/voltha/protos/go"]
+# Copy files
+ADD adapters/simulated_olt $GOPATH/src/github.com/opencord/voltha-go/adapters/simulated_olt
+ADD adapters/common $GOPATH/src/github.com/opencord/voltha-go/adapters/common
+ADD adapters/*.go $GOPATH/src/github.com/opencord/voltha-go/adapters/
+ADD common $GOPATH/src/github.com/opencord/voltha-go/common
+ADD db $GOPATH/src/github.com/opencord/voltha-go/db
+ADD kafka $GOPATH/src/github.com/opencord/voltha-go/kafka
+# Copy required proto files
+# ... VOLTHA proos
+ADD protos/*.proto /src/protos/
+ADD protos/scripts/* /src/protos/
+# Install golang protobuf
+RUN go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
+RUN go get -u github.com/golang/protobuf/protoc-gen-go
+# Compile protobuf files
+RUN sh /src/protos/build_protos.sh /src/protos
+# Build simulated_olt
+RUN cd $GOPATH/src/github.com/opencord/voltha-go/adapters/simulated_olt && go get -d ./... && rm -rf $GOPATH/src/go.etcd.io/etcd/vendor/golang.org/x/net/trace && go build -o /src/simulated_olt
+# -------------
+# Image creation stage
+FROM alpine:3.6
+# Set the working directory
+# Copy required files
+COPY --from=build-env /src/simulated_olt /app/
+# -------------
+# Build stage
+FROM golang:1.9.2-alpine AS build-env
+# Install required packages
+RUN apk add --no-cache wget git make build-base protobuf protobuf-dev
+# Prepare directory structure
+RUN ["mkdir", "-p", "/src", "src/protos"]
+RUN ["mkdir", "-p", "$GOPATH/src", "$GOPATH/pkg", "$GOPATH/bin"]
+RUN ["mkdir", "-p", "$GOPATH/src/github.com/opencord/voltha/protos/go"]
+# Copy files
+ADD adapters/simulated_onu $GOPATH/src/github.com/opencord/voltha-go/adapters/simulated_onu
+ADD adapters/common $GOPATH/src/github.com/opencord/voltha-go/adapters/common
+ADD adapters/*.go $GOPATH/src/github.com/opencord/voltha-go/adapters/
+ADD common $GOPATH/src/github.com/opencord/voltha-go/common
+ADD db $GOPATH/src/github.com/opencord/voltha-go/db
+ADD kafka $GOPATH/src/github.com/opencord/voltha-go/kafka
+# Copy required proto files
+# ... VOLTHA proos
+ADD protos/*.proto /src/protos/
+ADD protos/scripts/* /src/protos/
+# Install golang protobuf
+RUN go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
+RUN go get -u github.com/golang/protobuf/protoc-gen-go
+# Compile protobuf files
+RUN sh /src/protos/build_protos.sh /src/protos
+# Build simulated_onu
+RUN cd $GOPATH/src/github.com/opencord/voltha-go/adapters/simulated_onu && go get -d ./... && rm -rf $GOPATH/src/go.etcd.io/etcd/vendor/golang.org/x/net/trace && go build -o /src/simulated_onu
+# -------------
+# Image creation stage
+FROM alpine:3.6
+# Set the working directory
+# Copy required files
+COPY --from=build-env /src/simulated_onu /app/
 		Id:         deviceId,
 		DeviceType: deviceType,
 		ParentId:   parentId,
-		Publisher:publisher,
+		Publisher:  publisher,
 	var marshalledData *any.Any
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
 	doneCh                        chan int
 	topicToConsumerChannelMap     map[string]*consumerChannels
 	lockTopicToConsumerChannelMap sync.RWMutex
+	topicLockMap                  map[string]*sync.RWMutex
+	lockOfTopicLockMap            sync.RWMutex
 type SaramaClientOption func(*SaramaClient)
@@ -187,7 +189,8 @@
 	client.lockTopicToConsumerChannelMap = sync.RWMutex{}
+	client.topicLockMap = make(map[string]*sync.RWMutex)
+	client.lockOfTopicLockMap = sync.RWMutex{}
 	return client
@@ -261,6 +264,9 @@
 //CreateTopic creates a topic on the Kafka Broker.
 func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
 	// Set the topic details
 	topicDetail := &sarama.TopicDetail{}
 	topicDetail.NumPartitions = int32(numPartition)
@@ -286,6 +292,9 @@
 //DeleteTopic removes a topic from the kafka Broker
 func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
 	// Remove the topic from the broker
 	if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
 		if err == sarama.ErrUnknownTopicOrPartition {
@@ -308,6 +317,9 @@
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
 func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
 	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
 	// If a consumers already exist for that topic then resuse it
@@ -352,6 +364,9 @@
 //UnSubscribe unsubscribe a consumer from a given topic
 func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
 	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
 	err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
 	return err
@@ -417,6 +432,26 @@
 	return nil
+func (sc *SaramaClient) lockTopic(topic *Topic) {
+	sc.lockOfTopicLockMap.Lock()
+	if _, exist := sc.topicLockMap[topic.Name]; exist {
+		sc.lockOfTopicLockMap.Unlock()
+		sc.topicLockMap[topic.Name].Lock()
+	} else {
+		sc.topicLockMap[topic.Name] = &sync.RWMutex{}
+		sc.lockOfTopicLockMap.Unlock()
+		sc.topicLockMap[topic.Name].Lock()
+	}
+func (sc *SaramaClient) unLockTopic(topic *Topic) {
+	sc.lockOfTopicLockMap.Lock()
+	defer sc.lockOfTopicLockMap.Unlock()
+	if _, exist := sc.topicLockMap[topic.Name]; exist {
+		sc.topicLockMap[topic.Name].Unlock()
+	}
 func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
@@ -38,7 +38,7 @@
 	kafkaICProxy        *kafka.InterContainerProxy
 	stateTransitions    *TransitionMap
 	clusterDataProxy    *model.Proxy
-	coreInstanceId string
+	coreInstanceId      string
 	exitChannel         chan int
 	lockDeviceAgentsMap sync.RWMutex
@@ -37,7 +37,7 @@
 type APIHandler struct {
 	deviceMgr        *DeviceManager
 	logicalDeviceMgr *LogicalDeviceManager
-	packetInQueue     *queue.Queue
+	packetInQueue    *queue.Queue
@@ -46,7 +46,7 @@
 		deviceMgr:        deviceMgr,
 		logicalDeviceMgr: lDeviceMgr,
 		// TODO: Figure out what the 'hint' parameter to queue.New does
-		packetInQueue:    queue.New(10),
+		packetInQueue: queue.New(10),
 	return handler
@@ -219,7 +219,7 @@
 	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
-	log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
+	//log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
 	defer log.CleanUp()