[VOL-1035] Initial submission of flow decomposition code.
Additional test cases will follow to test the core of the flow
decomposition functionality
Change-Id: Ie685714ce5ab54ac89501a67f9489613de195c15
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 6d78aa4..82c6dec 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -81,8 +81,6 @@
return unPackResponse(rpc, device.Id, success, result)
}
-
-
func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
rpc := "reenable_device"
@@ -279,4 +277,4 @@
func (ap *AdapterProxy) UnSuppressAlarm(filter voltha.AlarmFilter) error {
log.Debug("UnSuppressAlarm")
return nil
-}
\ No newline at end of file
+}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index d75a44f..8449ccb 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -56,7 +56,7 @@
// Setup the KV store
// Do not call NewBackend constructor; it creates its own KV client
- backend := model.Backend {
+ backend := model.Backend{
Client: kvClient,
StoreType: cf.KVStoreType,
Host: cf.KVStoreHost,
diff --git a/rw_core/core/grpc_nbi_api_handler_client_test.go b/rw_core/core/grpc_nbi_api_handler_client_test.go
index 21300cc..58dcf13 100644
--- a/rw_core/core/grpc_nbi_api_handler_client_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_client_test.go
@@ -42,7 +42,7 @@
func setup() {
var err error
- if err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
+ if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
}
conn, err = grpc.Dial("localhost:50057", grpc.WithInsecure())
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index dce2db7..519a0a1 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -21,8 +21,11 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
ca "github.com/opencord/voltha-go/protos/core_adapter"
- "github.com/opencord/voltha-go/protos/openflow_13"
+ ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
+ fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
+ "github.com/opencord/voltha-go/rw_core/graph"
+ fu "github.com/opencord/voltha-go/rw_core/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"sync"
@@ -36,6 +39,8 @@
ldeviceMgr *LogicalDeviceManager
clusterDataProxy *model.Proxy
exitChannel chan int
+ deviceGraph *graph.DeviceGraph
+ DefaultFlowRules *fu.DeviceRules
lockLogicalDevice sync.RWMutex
}
@@ -48,6 +53,7 @@
agent.deviceMgr = deviceMgr
agent.clusterDataProxy = cdProxy
agent.ldeviceMgr = ldeviceMgr
+ //agent.deviceGraph =
agent.lockLogicalDevice = sync.RWMutex{}
return &agent
}
@@ -63,8 +69,8 @@
return err
}
ld := &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
- ld.Desc = (proto.Clone(switchCap.Desc)).(*openflow_13.OfpDesc)
- ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*openflow_13.OfpSwitchFeatures)
+ ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
+ ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
//Add logical ports to the logical device based on the number of NNI ports discovered
//First get the default port capability - TODO: each NNI port may have different capabilities,
@@ -199,4 +205,225 @@
return nil
}
+func isNNIPort(portNo uint32, nniPortsNo []uint32) bool {
+ for _, pNo := range nniPortsNo {
+ if pNo == portNo {
+ return true
+ }
+ }
+ return false
+}
+func (agent *LogicalDeviceAgent) getPreCalculatedRoute(ingress, egress uint32) []graph.RouteHop {
+ for routeLink, route := range agent.deviceGraph.Routes {
+ if ingress == routeLink.Ingress && egress == routeLink.Egress {
+ return route
+ }
+ }
+ log.Warnw("no-route", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "ingress": ingress, "egress": egress})
+ return nil
+}
+
+func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo *uint32, egressPortNo *uint32) []graph.RouteHop {
+ agent.lockLogicalDevice.Lock()
+ defer agent.lockLogicalDevice.Unlock()
+ log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
+ // Get the updated logical device
+ var ld *ca.LogicalDevice
+ routes := make([]graph.RouteHop, 0)
+ var err error
+ if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+ return nil
+ }
+ nniLogicalPortsNo := make([]uint32, 0)
+ for _, logicalPort := range ld.Ports {
+ if logicalPort.RootPort {
+ nniLogicalPortsNo = append(nniLogicalPortsNo, logicalPort.OfpPort.PortNo)
+ }
+ }
+ if len(nniLogicalPortsNo) == 0 {
+ log.Errorw("no-nni-ports", log.Fields{"LogicalDeviceId": ld.Id})
+ return nil
+ }
+ // Consider different possibilities
+ if egressPortNo != nil && ((*egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
+ log.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+ if isNNIPort(*ingressPortNo, nniLogicalPortsNo) {
+ log.Debug("returning-half-route")
+ //This is a trap on the NNI Port
+ //Return a 'half' route to make the flow decomposer logic happy
+ for routeLink, route := range agent.deviceGraph.Routes {
+ if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+ routes = append(routes, graph.RouteHop{}) // first hop is set to empty
+ routes = append(routes, route[1])
+ return routes
+ }
+ }
+ log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+ return nil
+ }
+ //treat it as if the output port is the first NNI of the OLT
+ egressPortNo = &nniLogicalPortsNo[0]
+ }
+ //If ingress port is not specified (nil), it may be a wildcarded
+ //route if egress port is OFPP_CONTROLLER or a nni logical port,
+ //in which case we need to create a half-route where only the egress
+ //hop is filled, the first hop is nil
+ if ingressPortNo == nil && isNNIPort(*egressPortNo, nniLogicalPortsNo) {
+ // We can use the 2nd hop of any upstream route, so just find the first upstream:
+ for routeLink, route := range agent.deviceGraph.Routes {
+ if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
+ routes = append(routes, graph.RouteHop{}) // first hop is set to empty
+ routes = append(routes, route[1])
+ return routes
+ }
+ }
+ log.Warnw("no-upstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+ return nil
+ }
+ //If egress port is not specified (nil), we can also can return a "half" route
+ if egressPortNo == nil {
+ for routeLink, route := range agent.deviceGraph.Routes {
+ if routeLink.Ingress == *ingressPortNo {
+ routes = append(routes, route[0])
+ routes = append(routes, graph.RouteHop{})
+ return routes
+ }
+ }
+ log.Warnw("no-downstream-route", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "nniPortsNo": nniLogicalPortsNo})
+ return nil
+ }
+
+ // Return the pre-calculated route
+ return agent.getPreCalculatedRoute(*ingressPortNo, *egressPortNo)
+}
+
+// updateRoutes updates the device routes whenever there is a device or port changes relevant to this
+// logical device. TODO: Add more heuristics to this process to update the routes where a change has occurred
+// instead of rebuilding the entire set of routes
+func (agent *LogicalDeviceAgent) updateRoutes() {
+ if ld, err := agent.getLogicalDevice(); err == nil {
+ agent.deviceGraph.ComputeRoutes(ld.Ports)
+ }
+}
+
+func (agent *LogicalDeviceAgent) rootDeviceDefaultRules() *fu.FlowsAndGroups {
+ return fu.NewFlowsAndGroups()
+}
+
+func (agent *LogicalDeviceAgent) leafDeviceDefaultRules(deviceId string) *fu.FlowsAndGroups {
+ fg := fu.NewFlowsAndGroups()
+ var device *voltha.Device
+ var err error
+ if device, err = agent.deviceMgr.getDevice(deviceId); err != nil {
+ return fg
+ }
+ //set the upstream and downstream ports
+ upstreamPorts := make([]*voltha.Port, 0)
+ downstreamPorts := make([]*voltha.Port, 0)
+ for _, port := range device.Ports {
+ if port.Type == voltha.Port_PON_ONU || port.Type == voltha.Port_VENET_ONU {
+ upstreamPorts = append(upstreamPorts, port)
+ } else if port.Type == voltha.Port_ETHERNET_UNI {
+ downstreamPorts = append(downstreamPorts, port)
+ }
+ }
+ //it is possible that the downstream ports are not created, but the flow_decomposition has already
+ //kicked in. In such scenarios, cut short the processing and return.
+ if len(downstreamPorts) == 0 {
+ return fg
+ }
+ // set up the default flows
+ var fa *fu.FlowArgs
+ fa = &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 500},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fd.InPort(downstreamPorts[0].PortNo),
+ fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ },
+ Actions: []*ofp.OfpAction{
+ fd.SetField(fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan)),
+ },
+ }
+ fg.AddFlow(fd.MkFlowStat(fa))
+
+ fa = &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 500},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fd.InPort(downstreamPorts[0].PortNo),
+ fd.VlanVid(0),
+ },
+ Actions: []*ofp.OfpAction{
+ fd.PushVlan(0x8100),
+ fd.SetField(fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan)),
+ fd.Output(upstreamPorts[0].PortNo),
+ },
+ }
+ fg.AddFlow(fd.MkFlowStat(fa))
+
+ fa = &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 500},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fd.InPort(upstreamPorts[0].PortNo),
+ fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | device.Vlan),
+ },
+ Actions: []*ofp.OfpAction{
+ fd.SetField(fd.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0)),
+ fd.Output(downstreamPorts[0].PortNo),
+ },
+ }
+ fg.AddFlow(fd.MkFlowStat(fa))
+
+ return fg
+}
+
+func (agent *LogicalDeviceAgent) generateDefaultRules() *fu.DeviceRules {
+ rules := fu.NewDeviceRules()
+ var ld *voltha.LogicalDevice
+ var err error
+ if ld, err = agent.getLogicalDevice(); err != nil {
+ log.Warnw("no-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+ return rules
+ }
+
+ deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
+ for deviceId, _ := range deviceNodeIds {
+ if deviceId == ld.RootDeviceId {
+ rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
+ } else {
+ rules.AddFlowsAndGroup(deviceId, agent.leafDeviceDefaultRules(deviceId))
+ }
+ }
+ return rules
+}
+
+func (agent *LogicalDeviceAgent) GetAllDefaultRules() *fu.DeviceRules {
+ // Get latest
+ var lDevice *voltha.LogicalDevice
+ var err error
+ if lDevice, err = agent.getLogicalDevice(); err != nil {
+ return fu.NewDeviceRules()
+ }
+ if agent.DefaultFlowRules == nil { // Nothing setup yet
+ agent.deviceGraph = graph.NewDeviceGraph(agent.deviceMgr.getDevice)
+ agent.deviceGraph.ComputeRoutes(lDevice.Ports)
+ agent.DefaultFlowRules = agent.generateDefaultRules()
+ }
+ return agent.DefaultFlowRules
+}
+
+func (agent *LogicalDeviceAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
+ lPorts := make([]uint32, 0)
+ var exclPort uint32
+ if len(excludePort) == 1 {
+ exclPort = excludePort[0]
+ }
+ if lDevice, _ := agent.getLogicalDevice(); lDevice != nil {
+ for _, port := range lDevice.Ports {
+ if port.OfpPort.PortNo != exclPort {
+ lPorts = append(lPorts, port.OfpPort.PortNo)
+ }
+ }
+ }
+ return lPorts
+}
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index c2a3634..a8bcfd6 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -35,57 +35,58 @@
package core
import (
- "time"
- "github.com/opencord/voltha-go/db/kvstore"
- log "github.com/opencord/voltha-go/common/log"
+ log "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "time"
)
// Transaction acquisition results
const (
- UNKNOWN = iota
- SEIZED_BY_SELF
- COMPLETED_BY_OTHER
- ABANDONED_BY_OTHER
- STOPPED_WAITING_FOR_OTHER
+ UNKNOWN = iota
+ SEIZED_BY_SELF
+ COMPLETED_BY_OTHER
+ ABANDONED_BY_OTHER
+ STOPPED_WAITING_FOR_OTHER
)
const (
- TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
+ TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
)
type TransactionContext struct {
- kvClient kvstore.Client
- kvOperationTimeout int
- owner string
- timeToDeleteCompletedKeys int
- txnPrefix string
+ kvClient kvstore.Client
+ kvOperationTimeout int
+ owner string
+ timeToDeleteCompletedKeys int
+ txnPrefix string
}
+
var ctx *TransactionContext
-var txnState = []string {
- "UNKNOWN",
- "SEIZED-BY-SELF",
- "COMPLETED-BY-OTHER",
- "ABANDONED-BY-OTHER",
- "STOPPED-WAITING-FOR-OTHER"}
+var txnState = []string{
+ "UNKNOWN",
+ "SEIZED-BY-SELF",
+ "COMPLETED-BY-OTHER",
+ "ABANDONED-BY-OTHER",
+ "STOPPED-WAITING-FOR-OTHER"}
func init() {
- log.AddPackage(log.JSON, log.WarnLevel, nil)
+ log.AddPackage(log.JSON, log.WarnLevel, nil)
}
func NewTransactionContext(
- owner string,
- txnPrefix string,
- kvClient kvstore.Client,
- kvOpTimeout int,
- keyDeleteTime int) *TransactionContext {
+ owner string,
+ txnPrefix string,
+ kvClient kvstore.Client,
+ kvOpTimeout int,
+ keyDeleteTime int) *TransactionContext {
- return &TransactionContext{
- owner: owner,
- txnPrefix: txnPrefix,
- kvClient: kvClient,
- kvOperationTimeout: kvOpTimeout,
- timeToDeleteCompletedKeys: keyDeleteTime}
+ return &TransactionContext{
+ owner: owner,
+ txnPrefix: txnPrefix,
+ kvClient: kvClient,
+ kvOperationTimeout: kvOpTimeout,
+ timeToDeleteCompletedKeys: keyDeleteTime}
}
/*
@@ -104,19 +105,19 @@
* TRANSACTION_COMPLETE key
*/
func SetTransactionContext(owner string,
- txnPrefix string,
- kvClient kvstore.Client,
- kvOpTimeout int,
- keyDeleteTime int) error {
+ txnPrefix string,
+ kvClient kvstore.Client,
+ kvOpTimeout int,
+ keyDeleteTime int) error {
- ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
- return nil
+ ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
+ return nil
}
type KVTransaction struct {
- ch chan int
- txnId string
- txnKey string
+ ch chan int
+ txnId string
+ txnKey string
}
/*
@@ -126,9 +127,9 @@
* :return: A KVTransaction instance
*/
func NewKVTransaction(txnId string) *KVTransaction {
- return &KVTransaction{
- txnId: txnId,
- txnKey: ctx.txnPrefix + txnId}
+ return &KVTransaction{
+ txnId: txnId,
+ txnKey: ctx.txnPrefix + txnId}
}
/*
@@ -143,98 +144,96 @@
* false - reservation not acquired, request being processed by another core
*/
func (c *KVTransaction) Acquired(duration int64) bool {
- var acquired bool
- var currOwner string = ""
- var res int
+ var acquired bool
+ var currOwner string = ""
+ var res int
- // Convert milliseconds to seconds, rounding up
- // The reservation TTL is specified in seconds
- durationInSecs := duration / 1000
- if remainder := duration % 1000; remainder > 0 {
- durationInSecs++
- }
- value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
+ // Convert milliseconds to seconds, rounding up
+ // The reservation TTL is specified in seconds
+ durationInSecs := duration / 1000
+ if remainder := duration % 1000; remainder > 0 {
+ durationInSecs++
+ }
+ value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
- // If the reservation failed, do we simply abort or drop into watch mode anyway?
- // Setting value to nil leads to watch mode
- if value != nil {
- if currOwner, err = kvstore.ToString(value); err != nil {
- log.Error("unexpected-owner-type")
- value = nil
- }
- }
- if err == nil && value != nil && currOwner == ctx.owner {
- // Process the request immediately
- res = SEIZED_BY_SELF
- } else {
- // Another core instance has reserved the request
- // Watch for reservation expiry or successful request completion
- events := ctx.kvClient.Watch(c.txnKey)
- log.Debugw("watch-other-server",
- log.Fields{"owner": currOwner, "timeout": duration})
+ // If the reservation failed, do we simply abort or drop into watch mode anyway?
+ // Setting value to nil leads to watch mode
+ if value != nil {
+ if currOwner, err = kvstore.ToString(value); err != nil {
+ log.Error("unexpected-owner-type")
+ value = nil
+ }
+ }
+ if err == nil && value != nil && currOwner == ctx.owner {
+ // Process the request immediately
+ res = SEIZED_BY_SELF
+ } else {
+ // Another core instance has reserved the request
+ // Watch for reservation expiry or successful request completion
+ events := ctx.kvClient.Watch(c.txnKey)
+ log.Debugw("watch-other-server",
+ log.Fields{"owner": currOwner, "timeout": duration})
- select {
- // Add a timeout here in case we miss an event from the KV
- case <-time.After(time.Duration(duration) * time.Millisecond):
- // In case of missing events, let's check the transaction key
- kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
- if err == nil && kvp == nil {
- log.Debug("missed-deleted-event")
- res = ABANDONED_BY_OTHER
- } else if val, err := kvstore.ToString(kvp.Value);
- err == nil && val == TRANSACTION_COMPLETE {
- log.Debugw("missed-put-event",
- log.Fields{"key": c.txnKey, "value": val})
- res = COMPLETED_BY_OTHER
- } else {
- res = STOPPED_WAITING_FOR_OTHER
- }
+ select {
+ // Add a timeout here in case we miss an event from the KV
+ case <-time.After(time.Duration(duration) * time.Millisecond):
+ // In case of missing events, let's check the transaction key
+ kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
+ if err == nil && kvp == nil {
+ log.Debug("missed-deleted-event")
+ res = ABANDONED_BY_OTHER
+ } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
+ log.Debugw("missed-put-event",
+ log.Fields{"key": c.txnKey, "value": val})
+ res = COMPLETED_BY_OTHER
+ } else {
+ res = STOPPED_WAITING_FOR_OTHER
+ }
- case event := <-events:
- log.Debugw("received-event", log.Fields{"type": event.EventType})
- if event.EventType == kvstore.DELETE {
- // The other core failed to process the request; step up
- res = ABANDONED_BY_OTHER
- } else if event.EventType == kvstore.PUT {
- key, e1 := kvstore.ToString(event.Key)
- val, e2 := kvstore.ToString(event.Value)
- if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
- res = COMPLETED_BY_OTHER
- // Successful request completion has been detected
- // Remove the transaction key
- c.Delete()
- }
- }
- }
- }
- // Clean-up: delete the transaction key after a long delay
- go c.deleteTransactionKey()
+ case event := <-events:
+ log.Debugw("received-event", log.Fields{"type": event.EventType})
+ if event.EventType == kvstore.DELETE {
+ // The other core failed to process the request; step up
+ res = ABANDONED_BY_OTHER
+ } else if event.EventType == kvstore.PUT {
+ key, e1 := kvstore.ToString(event.Key)
+ val, e2 := kvstore.ToString(event.Value)
+ if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
+ res = COMPLETED_BY_OTHER
+ // Successful request completion has been detected
+ // Remove the transaction key
+ c.Delete()
+ }
+ }
+ }
+ }
+ // Clean-up: delete the transaction key after a long delay
+ go c.deleteTransactionKey()
- log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
- switch res {
- case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
- acquired = true
- default:
- acquired = false
- }
- return acquired
+ log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
+ switch res {
+ case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+ acquired = true
+ default:
+ acquired = false
+ }
+ return acquired
}
func (c *KVTransaction) deleteTransactionKey() {
- log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
- time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
- log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
- ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
+ log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
+ time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
+ log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
+ ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
}
func (c *KVTransaction) Close() error {
- log.Debugw("close", log.Fields{"key": c.txnKey})
- return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
+ log.Debugw("close", log.Fields{"key": c.txnKey})
+ return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
}
func (c *KVTransaction) Delete() error {
- log.Debugw("delete", log.Fields{"key": c.txnKey})
- err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
- return err
+ log.Debugw("delete", log.Fields{"key": c.txnKey})
+ err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
+ return err
}
-