VOL-1512: Set device Active ownership per Core in a Core pair
- Changed NB & SB APIs to seize requests based on device ownership
- Added queue support for change-events
- Need to make prefix & timeout for the device ownership key configurable,
  currently hard-coded
- Need to make KV Transaction Monitor timeout configurable,
  currently hard-coded
- Need to clean up AdapterRequestHandlerProxy & LogicalDeviceManager
  constructors

Change-Id: Ieeb3df6d70baa529b87c8253cb9f0f5b2a94382a
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index f5295c4..3d9487f 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -40,12 +40,14 @@
 	defaultRequestTimeout     int64
 	longRunningRequestTimeout int64
 	coreInCompetingMode       bool
+	core                      *Core
 }
 
-func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
+func NewAdapterRequestHandlerProxy(core *Core, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
 	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout int64,
 	defaultRequestTimeout int64) *AdapterRequestHandlerProxy {
 	var proxy AdapterRequestHandlerProxy
+	proxy.core = core
 	proxy.coreInstanceId = coreInstanceId
 	proxy.deviceMgr = dMgr
 	proxy.lDeviceMgr = ldMgr
@@ -58,7 +60,7 @@
 	return &proxy
 }
 
-func (rhp *AdapterRequestHandlerProxy) acquireTransaction(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
+func (rhp *AdapterRequestHandlerProxy) acquireRequest(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := rhp.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
@@ -74,6 +76,33 @@
 	}
 }
 
+// This is a helper function that attempts to acquire the request by using the device ownership model
+func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(transactionId string, devId string, maxTimeout ...int64) (*KVTransaction, error) {
+	timeout := rhp.defaultRequestTimeout
+	if len(maxTimeout) > 0 {
+		timeout = maxTimeout[0]
+	}
+	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+	txn := NewKVTransaction(transactionId)
+	if txn == nil {
+		return nil, errors.New("fail-to-create-transaction")
+	}
+
+	if rhp.core.deviceOwnership.OwnedByMe(devId) {
+		if txn.Acquired(timeout) {
+			return txn, nil
+		} else {
+			return nil, errors.New("failed-to-seize-request")
+		}
+	} else {
+		if txn.Monitor(timeout) {
+			return txn, nil
+		} else {
+			return nil, errors.New("device-not-owned")
+		}
+	}
+}
+
 // competeForTransaction is a helper function to determine whether every request needs to compete with another
 // Core to execute the request
 func (rhp *AdapterRequestHandlerProxy) competeForTransaction() bool {
@@ -112,7 +141,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// Update our adapters in memory
 			go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory()
@@ -156,7 +185,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -224,7 +253,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, device.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -294,7 +323,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -336,7 +365,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -416,7 +445,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -500,7 +529,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -557,7 +586,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -616,7 +645,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -682,7 +711,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -733,7 +762,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -785,7 +814,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, pmConfigs.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
@@ -887,7 +916,7 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 13baf84..02f99fd 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -50,6 +50,7 @@
 	kafkaClient       kafka.Client
 	coreMembership *voltha.Membership
 	membershipLock *sync.RWMutex
+	deviceOwnership    *DeviceOwnership
 }
 
 func init() {
@@ -64,6 +65,10 @@
 	core.kvClient = kvClient
 	core.kafkaClient = kafkaClient
 
+	// Setup device ownership context
+	core.deviceOwnership = NewDeviceOwnership(id, kvClient,
+		"service/voltha/owns_device", 60)
+
 	// Setup the KV store
 	// Do not call NewBackend constructor; it creates its own KV client
 	// Commented the backend for now until the issue between the model and the KV store
@@ -90,8 +95,8 @@
 	}
 	log.Info("values", log.Fields{"kmp": core.kmp})
 	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
-	core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
-	core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
+	core.deviceMgr = newDeviceManager(core)
+	core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy)
 
 	if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
 		log.Fatal("Failure-registering-adapterRequestHandler")
@@ -179,7 +184,7 @@
 func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
 	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
 ) error {
-	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+	requestProxy := NewAdapterRequestHandlerProxy(core, coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
 		core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
 
 	// Register the broadcast topic to handle any core-bound broadcast requests
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 09eca3b..06d3bd4 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,6 +33,7 @@
 
 type DeviceManager struct {
 	deviceAgents        map[string]*DeviceAgent
+	core                *Core
 	adapterProxy        *AdapterProxy
 	adapterMgr          *AdapterManager
 	logicalDeviceMgr    *LogicalDeviceManager
@@ -44,15 +45,16 @@
 	lockDeviceAgentsMap sync.RWMutex
 }
 
-func newDeviceManager(kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, adapterMgr *AdapterManager, coreInstanceId string) *DeviceManager {
+func newDeviceManager(core *Core) *DeviceManager {
 	var deviceMgr DeviceManager
+	deviceMgr.core = core
 	deviceMgr.exitChannel = make(chan int, 1)
 	deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
-	deviceMgr.adapterProxy = NewAdapterProxy(kafkaICProxy)
-	deviceMgr.kafkaICProxy = kafkaICProxy
-	deviceMgr.coreInstanceId = coreInstanceId
-	deviceMgr.clusterDataProxy = cdProxy
-	deviceMgr.adapterMgr = adapterMgr
+	deviceMgr.kafkaICProxy = core.kmp
+	deviceMgr.adapterProxy = NewAdapterProxy(core.kmp)
+	deviceMgr.coreInstanceId = core.instanceId
+	deviceMgr.clusterDataProxy = core.clusterDataProxy
+	deviceMgr.adapterMgr = core.adapterMgr
 	deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
 	return &deviceMgr
 }
@@ -184,6 +186,7 @@
 		if res == nil { //Success
 			agent.stop(ctx)
 			dMgr.deleteDeviceAgentToMap(agent)
+			dMgr.core.deviceOwnership.AbandonDevice(id.Id)
 		}
 		log.Debugw("deleteDevice-result", log.Fields{"result": res})
 	} else {
@@ -624,6 +627,9 @@
 	dMgr.addDeviceAgentToMap(agent)
 	agent.start(nil, false)
 
+	// Set device ownership
+	dMgr.core.deviceOwnership.OwnedByMe(agent.deviceId)
+
 	// Activate the child device
 	if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
 		go agent.enableDevice(nil)
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index b881351..f229383 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -81,17 +81,34 @@
 }
 
 func (da *DeviceOwnership) startOwnershipMonitoring(id string, chnl chan int) {
+	var op string
+
 startloop:
 	for {
-		if err := da.setOwnership(id, da.tryToReserveKey(id)); err != nil {
-			log.Errorw("unexpected-error", log.Fields{"error": err})
+		da.deviceMapLock.RLock()
+		val, exist := da.deviceMap[id]
+		da.deviceMapLock.RUnlock()
+		if exist && val.owned {
+			// Device owned; renew reservation
+			op = "renew"
+			kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
+			if err := da.kvClient.RenewReservation(kvKey); err != nil {
+				log.Errorw("reservation-renewal-error", log.Fields{"error": err})
+			}
+		} else {
+			// Device not owned; try to seize ownership
+			op = "retry"
+			if err := da.setOwnership(id, da.tryToReserveKey(id)); err != nil {
+				log.Errorw("unexpected-error", log.Fields{"error": err})
+			}
 		}
 		select {
 		case <-da.exitChannel:
 			log.Infow("closing-monitoring", log.Fields{"Id": id})
 			break startloop
 		case <-time.After(time.Duration(da.reservationTimeout) / 3 * time.Second):
-			log.Infow("renew-reservation", log.Fields{"Id": id})
+			msg := fmt.Sprintf("%s-reservation", op)
+			log.Infow(msg, log.Fields{"Id": id})
 		case <-chnl:
 			log.Infow("closing-device-monitoring", log.Fields{"Id": id})
 			break startloop
@@ -145,4 +162,4 @@
 		return nil
 	}
 	return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
-}
\ No newline at end of file
+}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 7b486bd..d7834eb 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -53,6 +53,7 @@
 	logicalDeviceMgr *LogicalDeviceManager
 	adapterMgr *AdapterManager
 	packetInQueue    *queue.Queue
+	changeEventQueue *queue.Queue
 	coreInCompetingMode bool
 	longRunningRequestTimeout int64
 	defaultRequestTimeout int64
@@ -84,6 +85,7 @@
 		defaultRequestTimeout:core.config.DefaultRequestTimeout,
 		// TODO: Figure out what the 'hint' parameter to queue.New does
 		packetInQueue: queue.New(10),
+		changeEventQueue: queue.New(10),
 		core: core,
 	}
 	return handler
@@ -138,7 +140,8 @@
 	return handler.coreInCompetingMode
 }
 
-func (handler *APIHandler) acquireTransaction(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
+// This function handles the creation of new devices
+func (handler *APIHandler) acquireRequest(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := handler.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
@@ -168,6 +171,41 @@
 	}
 }
 
+// This function handles the modification or deletion of existing devices
+func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
+	timeout := handler.defaultRequestTimeout
+	if len(maxTimeout) > 0 {
+		timeout = maxTimeout[0]
+	}
+	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+	txn, err := handler.createKvTransaction(ctx)
+	if txn == nil {
+		return nil,  err
+	}
+
+	owned := false
+	if id != nil {
+		if devId, ok := id.(*deviceID); ok {
+			owned = handler.core.deviceOwnership.OwnedByMe(devId.id)
+		} else if lDevId, ok := id.(*logicalDeviceID); ok {
+			owned = handler.core.deviceOwnership.OwnedByMe(lDevId.id)
+		}
+	}
+	if owned {
+		if txn.Acquired(timeout) {
+			return txn, nil
+		} else {
+			return nil, errors.New("failed-to-seize-request")
+		}
+	} else {
+		if txn.Monitor(timeout) {
+			return txn, nil
+		} else {
+			return nil, errors.New("device-not-owned")
+		}
+	}
+}
+
 // waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
 // response is expected in a successful scenario
 func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
@@ -226,7 +264,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -247,7 +285,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -269,7 +307,7 @@
 
 	if handler.competeForTransaction() {
 		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
+			if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
@@ -292,7 +330,7 @@
 
 	if handler.competeForTransaction() {
 		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
+			if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
@@ -377,7 +415,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
+		if txn, err := handler.acquireRequest(ctx, nil); err != nil {
 			return &voltha.Device{}, err
 		} else {
 			defer txn.Close()
@@ -394,6 +432,7 @@
 				return &voltha.Device{}, err
 			}
 			if d, ok := res.(*voltha.Device); ok {
+				handler.core.deviceOwnership.OwnedByMe(d.Id)
 				return d, nil
 			}
 		}
@@ -414,7 +453,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -435,7 +474,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -456,7 +495,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -477,7 +516,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -499,7 +538,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:img.Id}); err != nil {
 			return &common.OperationResp{}, err
 		} else {
 			defer txn.Close()
@@ -590,7 +629,7 @@
 	failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:img.Id}); err != nil {
 			return failedresponse, err
 		} else {
 			defer txn.Close()
@@ -760,7 +799,10 @@
 	//}
 	event := openflow_13.ChangeEvent{Id: deviceId, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
 	log.Debugw("sendChangeEvent", log.Fields{"event": event})
-	// TODO: put the packet in the queue
+	// Enqueue the change event
+	if err := handler.changeEventQueue.Put(event); err != nil {
+		log.Errorw("failed-to-enqueue-change-event", log.Fields{"error": err})
+	}
 }
 
 func (handler *APIHandler) ReceiveChangeEvents(
@@ -769,16 +811,18 @@
 ) error {
 	log.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
 	for {
-		// TODO: need to retrieve packet from queue
-		event := &openflow_13.ChangeEvent{}
-		time.Sleep(time.Duration(5) * time.Second)
-		err := changeEvents.Send(event)
-		if err != nil {
-			log.Errorw("Failed to send change event", log.Fields{"error": err})
+		// Dequeue a change event
+		if events, err := handler.changeEventQueue.Get(1); err == nil {
+			log.Debugw("dequeued-change-event", log.Fields{"event": events[0]})
+			if event, ok := events[0].(openflow_13.ChangeEvent); ok {
+				log.Debugw("sending-change-event", log.Fields{"event": event})
+				if err := changeEvents.Send(&event); err != nil {
+					log.Errorw("failed-to-send-change-event", log.Fields{"error": err})
+				}
+			}
 		}
 	}
-	// TODO: put the packet in the queue
-	}
+}
 
 func (handler *APIHandler) Subscribe(
 	ctx context.Context,
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index a3a29f0..47249f6 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -31,6 +31,7 @@
 
 type LogicalDeviceManager struct {
 	logicalDeviceAgents        map[string]*LogicalDeviceAgent
+	core                       *Core
 	deviceMgr                  *DeviceManager
 	grpcNbiHdlr                *APIHandler
 	adapterProxy               *AdapterProxy
@@ -40,8 +41,9 @@
 	lockLogicalDeviceAgentsMap sync.RWMutex
 }
 
-func newLogicalDeviceManager(deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
+func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
 	var logicalDeviceMgr LogicalDeviceManager
+	logicalDeviceMgr.core = core
 	logicalDeviceMgr.exitChannel = make(chan int, 1)
 	logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
 	logicalDeviceMgr.deviceMgr = deviceMgr
@@ -182,6 +184,9 @@
 	ldMgr.addLogicalDeviceAgentToMap(agent)
 	go agent.start(ctx, false)
 
+	// Set device ownership
+	ldMgr.core.deviceOwnership.OwnedByMe(id)
+
 	log.Debug("creating-logical-device-ends")
 	return &id, nil
 }
@@ -219,6 +224,7 @@
 		agent.stop(ctx)
 		//Remove the logical device agent from the Map
 		ldMgr.deleteLogicalDeviceAgent(logDeviceId)
+		ldMgr.core.deviceOwnership.AbandonDevice(logDeviceId)
 	}
 
 	log.Debug("deleting-logical-device-ends")
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index e7125d3..ec7e4ca 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -56,6 +56,7 @@
 type TransactionContext struct {
 	kvClient                  kvstore.Client
 	kvOperationTimeout        int
+	monitorLoopTime           int64
 	owner                     string
 	timeToDeleteCompletedKeys int
 	txnPrefix                 string
@@ -71,7 +72,7 @@
 	"STOPPED-WAITING-FOR-OTHER"}
 
 func init() {
-	log.AddPackage(log.JSON, log.WarnLevel, nil)
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
 func NewTransactionContext(
@@ -79,13 +80,15 @@
 	txnPrefix string,
 	kvClient kvstore.Client,
 	kvOpTimeout int,
-	keyDeleteTime int) *TransactionContext {
+	keyDeleteTime int,
+	monLoopTime int64) *TransactionContext {
 
 	return &TransactionContext{
 		owner:                     owner,
 		txnPrefix:                 txnPrefix,
 		kvClient:                  kvClient,
 		kvOperationTimeout:        kvOpTimeout,
+		monitorLoopTime:           monLoopTime,
 		timeToDeleteCompletedKeys: keyDeleteTime}
 }
 
@@ -99,18 +102,21 @@
  *                   will be created (e.g. "service/voltha/transactions")
  * :param kvClient: The client API used for all interactions with the KV store. Currently
  *                  only the etcd client is supported.
- * :param: kvOpTimeout: The maximum time to be taken by any KV operation used by this
- *                      package
- * :param keyDeleteTime: The time to wait, in the background, before deleting a
- *                       TRANSACTION_COMPLETE key
+ * :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
+ *                      used by this package
+ * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
+ *                       a TRANSACTION_COMPLETE key
+ * :param monLoopTime: The time in milliseconds that the monitor sleeps between
+ *                     checks for the existence of the transaction key
  */
 func SetTransactionContext(owner string,
 	txnPrefix string,
 	kvClient kvstore.Client,
 	kvOpTimeout int,
-	keyDeleteTime int) error {
+	keyDeleteTime int,
+	monLoopTime int64) error {
 
-	ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
+	ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
 	return nil
 }
 
@@ -170,42 +176,10 @@
 	} else {
 		// Another core instance has reserved the request
 		// Watch for reservation expiry or successful request completion
-		events := ctx.kvClient.Watch(c.txnKey)
 		log.Debugw("watch-other-server",
 			log.Fields{"owner": currOwner, "timeout": duration})
 
-		select {
-		// Add a timeout here in case we miss an event from the KV
-		case <-time.After(time.Duration(duration) * time.Millisecond):
-			// In case of missing events, let's check the transaction key
-			kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
-			if err == nil && kvp == nil {
-				log.Debug("missed-deleted-event")
-				res = ABANDONED_BY_OTHER
-			} else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
-				log.Debugw("missed-put-event",
-					log.Fields{"key": c.txnKey, "value": val})
-				res = COMPLETED_BY_OTHER
-			} else {
-				res = STOPPED_WAITING_FOR_OTHER
-			}
-
-		case event := <-events:
-			log.Debugw("received-event", log.Fields{"type": event.EventType})
-			if event.EventType == kvstore.DELETE {
-				// The other core failed to process the request; step up
-				res = ABANDONED_BY_OTHER
-			} else if event.EventType == kvstore.PUT {
-				key, e1 := kvstore.ToString(event.Key)
-				val, e2 := kvstore.ToString(event.Value)
-				if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
-					res = COMPLETED_BY_OTHER
-					// Successful request completion has been detected
-					// Remove the transaction key
-					c.Delete()
-				}
-			}
-		}
+		res = c.Watch(duration)
 	}
 	// Clean-up: delete the transaction key after a long delay
 	go c.deleteTransactionKey()
@@ -224,6 +198,103 @@
 	return acquired
 }
 
+/*
+ * This function monitors the progress of a request that's been reserved by another
+ * Voltha core.
+ *
+ * :param duration: The duration of the reservation in milliseconds
+ * :return: true - reservation abandoned by the other core, process the request
+ *          false - reservation not owned, request being processed by another core
+ */
+func (c *KVTransaction) Monitor(duration int64) bool {
+	var acquired bool
+	var res int
+	var timeElapsed int64
+
+	// Convert milliseconds to seconds, rounding up
+	// The reservation TTL is specified in seconds
+	durationInSecs := duration / 1000
+	if remainder := duration % 1000; remainder > 0 {
+		durationInSecs++
+	}
+	// Check if transaction key has been set
+	keyExists := false
+	for timeElapsed = 0; timeElapsed < duration; timeElapsed = timeElapsed + ctx.monitorLoopTime {
+		kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
+		if err == nil && kvp == nil {
+			// This core has received the request before the core that actually
+			// owns the device. The owning core has yet to seize the transaction.
+			time.Sleep(time.Duration(ctx.monitorLoopTime) * time.Millisecond)
+		} else {
+			keyExists = true
+			log.Debug("waited-for-other-to-reserve-transaction")
+			break
+		}
+	}
+	if keyExists {
+		// Watch for reservation expiry or successful request completion
+		log.Debugw("watch-other-server", log.Fields{"timeout": duration})
+		res = c.Watch(duration)
+	} else {
+		res = STOPPED_WAITING_FOR_OTHER
+	}
+	// Clean-up: delete the transaction key after a long delay
+	go c.deleteTransactionKey()
+
+	log.Debugw("own-transaction", log.Fields{"result": txnState[res]})
+	switch res {
+	case ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+		acquired = true
+	default:
+		acquired = false
+	}
+	// Ensure the request watcher does not reply before the request server
+	if !acquired {
+		time.Sleep(1 * time.Second)
+	}
+	return acquired
+}
+
+// duration in milliseconds
+func (c *KVTransaction) Watch(duration int64) int {
+	var res int
+
+	events := ctx.kvClient.Watch(c.txnKey)
+	select {
+	// Add a timeout here in case we miss an event from the KV
+	case <-time.After(time.Duration(duration) * time.Millisecond):
+		// In case of missing events, let's check the transaction key
+		kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
+		if err == nil && kvp == nil {
+			log.Debug("missed-deleted-event")
+			res = ABANDONED_BY_OTHER
+		} else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
+			log.Debugw("missed-put-event",
+				log.Fields{"key": c.txnKey, "value": val})
+			res = COMPLETED_BY_OTHER
+		} else {
+			res = STOPPED_WAITING_FOR_OTHER
+		}
+
+	case event := <-events:
+		log.Debugw("received-event", log.Fields{"type": event.EventType})
+		if event.EventType == kvstore.DELETE {
+			// The other core failed to process the request
+			res = ABANDONED_BY_OTHER
+		} else if event.EventType == kvstore.PUT {
+			key, e1 := kvstore.ToString(event.Key)
+			val, e2 := kvstore.ToString(event.Value)
+			if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
+				res = COMPLETED_BY_OTHER
+				// Successful request completion has been detected
+				// Remove the transaction key
+				c.Delete()
+			}
+		}
+	}
+	return res
+}
+
 func (c *KVTransaction) deleteTransactionKey() {
 	log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
 	time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)