[VOL-1505] This update enables the core to add a key when
publishing an event onto kafka. The corresponding update is
done in the adapter GO components. Similar changes remain to
be done in pyvoltha.
Change-Id: I0bb1e3cb8c2fa9e0214f96d863819755d34a0bb9
diff --git a/adapters/common/adapter_proxy.go b/adapters/common/adapter_proxy.go
index 4e63442..5e04484 100644
--- a/adapters/common/adapter_proxy.go
+++ b/adapters/common/adapter_proxy.go
@@ -90,7 +90,7 @@
replyToTopic := kafka.Topic{Name: toAdapter}
rpc := "Process_inter_adapter_message"
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
log.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
return unPackResponse(rpc, "", success, result)
}
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index a503c97..3198111 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -108,7 +108,7 @@
Value: deviceTypes,
}
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
return unPackResponse(rpc, "", success, result)
}
@@ -124,7 +124,7 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -148,7 +148,7 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -179,13 +179,13 @@
}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
- childDeviceType string, channelId int) error {
+ childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64 ) error {
log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
rpc := "ChildDeviceDetected"
// Use a device specific topic to send the request. The adapter handling the device creates a device
@@ -193,7 +193,7 @@
toTopic := ap.getCoreTopic(parentDeviceId)
replyToTopic := ap.getAdapterTopic()
- args := make([]*kafka.KVArg, 4)
+ args := make([]*kafka.KVArg, 7)
id := &voltha.ID{Id: parentDeviceId}
args[0] = &kafka.KVArg{
Key: "parent_device_id",
@@ -214,8 +214,23 @@
Key: "channel_id",
Value: channel,
}
+ vId := &ic.StrType{Val: vendorId}
+ args[4] = &kafka.KVArg{
+ Key: "vendor_id",
+ Value: vId,
+ }
+ sNo := &ic.StrType{Val: serialNumber}
+ args[5] = &kafka.KVArg{
+ Key: "serial_number",
+ Value: sNo,
+ }
+ oId := &ic.IntType{Val: int64(onuId)}
+ args[6] = &kafka.KVArg{
+ Key: "onu_id",
+ Value: oId,
+ }
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
return unPackResponse(rpc, parentDeviceId, success, result)
}
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index a8e6311..34ab711 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -84,6 +84,8 @@
ReleaseAllReservations() error
RenewReservation(key string) error
Watch(key string) chan *Event
+ AcquireLock(lockName string, timeout int) error
+ ReleaseLock(lockName string) error
CloseWatch(key string, ch chan *Event)
Close()
}
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index a5c71ac..738ca92 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -497,3 +497,12 @@
log.Errorw("error-closing-client", log.Fields{"error": err})
}
}
+
+
+func (c *ConsulClient) AcquireLock(lockName string, timeout int) error {
+ return nil
+}
+
+func (c *ConsulClient) ReleaseLock(lockName string) error {
+ return nil
+}
\ No newline at end of file
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 9ecddca..2caa990 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -33,6 +33,10 @@
keyReservations map[string]*v3Client.LeaseID
watchedChannels map[string][]map[chan *Event]v3Client.Watcher
writeLock sync.Mutex
+ lockToMutexMap map[string]*v3Concurrency.Mutex
+ lockToSessionMap map[string]*v3Concurrency.Session
+ lockToMutexLock sync.Mutex
+
}
// NewEtcdClient returns a new client for the Etcd KV store
@@ -49,8 +53,10 @@
}
wc := make(map[string][]map[chan *Event]v3Client.Watcher)
reservations := make(map[string]*v3Client.LeaseID)
+ lockMutexMap := make(map[string]*v3Concurrency.Mutex)
+ lockSessionMap := make(map[string]*v3Concurrency.Session)
- return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations: reservations}, nil
+ return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations:reservations, lockToMutexMap:lockMutexMap, lockToSessionMap:lockSessionMap}, nil
}
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
@@ -431,3 +437,63 @@
log.Errorw("error-closing-client", log.Fields{"error": err})
}
}
+
+func (c *EtcdClient) addLockName(lockName string, lock *v3Concurrency.Mutex, session *v3Concurrency.Session) {
+ c.lockToMutexLock.Lock()
+ defer c.lockToMutexLock.Unlock()
+ c.lockToMutexMap[lockName] = lock
+ c.lockToSessionMap[lockName] = session
+}
+
+func (c *EtcdClient) deleteLockName(lockName string) {
+ c.lockToMutexLock.Lock()
+ defer c.lockToMutexLock.Unlock()
+ delete(c.lockToMutexMap, lockName)
+ delete(c.lockToSessionMap, lockName)
+}
+
+func (c *EtcdClient) getLock(lockName string) (*v3Concurrency.Mutex, *v3Concurrency.Session) {
+ c.lockToMutexLock.Lock()
+ defer c.lockToMutexLock.Unlock()
+ var lock *v3Concurrency.Mutex
+ var session *v3Concurrency.Session
+ if l, exist := c.lockToMutexMap[lockName]; exist {
+ lock = l
+ }
+ if s, exist := c.lockToSessionMap[lockName]; exist {
+ session = s
+ }
+ return lock, session
+}
+
+
+func (c *EtcdClient) AcquireLock(lockName string, timeout int) error {
+ duration := GetDuration(timeout)
+ ctx, cancel := context.WithTimeout(context.Background(), duration)
+ session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+ mu := v3Concurrency.NewMutex(session, "/devicelock_" + lockName)
+ if err := mu.Lock(context.Background()); err != nil {
+ return err
+ }
+ c.addLockName(lockName, mu, session)
+ cancel()
+ return nil
+}
+
+func (c *EtcdClient) ReleaseLock(lockName string) error {
+ lock, session := c.getLock(lockName)
+ var err error
+ if lock != nil {
+ if e := lock.Unlock(context.Background()); e != nil {
+ err = e
+ }
+ }
+ if session != nil {
+ if e := session.Close(); e != nil {
+ err = e
+ }
+ }
+ c.deleteLockName(lockName)
+
+ return err
+}
\ No newline at end of file
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index c5e0772..1229e7a 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -227,7 +227,7 @@
// InvokeRPC is used to send a request to a given topic
func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
- waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
+ waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
// If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
// typically the device ID.
@@ -237,7 +237,7 @@
}
// Encode the request
- protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
+ protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
if err != nil {
log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
return false, nil
@@ -255,7 +255,7 @@
// Send request - if the topic is formatted with a device Id then we will send the request using a
// specific key, hence ensuring a single partition is used to publish the request. This ensures that the
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
- key := GetDeviceIdFromTopic(*toTopic)
+ //key := GetDeviceIdFromTopic(*toTopic)
log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
kp.kafkaClient.Send(protoRequest, toTopic, key)
@@ -549,6 +549,7 @@
Type: ic.MessageType_RESPONSE,
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
+ KeyTopic: request.Header.KeyTopic,
Timestamp: time.Now().Unix(),
}
@@ -703,7 +704,7 @@
// present then the key will be empty, hence all messages for a given topic will be sent to all
// partitions.
replyTopic := &Topic{Name: msg.Header.FromTopic}
- key := GetDeviceIdFromTopic(*replyTopic)
+ key := msg.Header.KeyTopic
log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
kp.kafkaClient.Send(icm, replyTopic, key)
@@ -757,12 +758,13 @@
//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
//or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
requestHeader := &ic.Header{
Id: uuid.New().String(),
Type: ic.MessageType_REQUEST,
FromTopic: replyTopic.Name,
ToTopic: toTopic.Name,
+ KeyTopic: key,
Timestamp: time.Now().Unix(),
}
requestBody := &ic.InterContainerRequestBody{
diff --git a/protos/inter_container.proto b/protos/inter_container.proto
index ac2af4f..4d4ffe8 100644
--- a/protos/inter_container.proto
+++ b/protos/inter_container.proto
@@ -48,7 +48,8 @@
MessageType type = 2;
string from_topic = 3;
string to_topic = 4;
- int64 timestamp = 5;
+ string key_topic = 5;
+ int64 timestamp = 6;
}
message Argument {
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index b3ba10d..5d21838 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -93,7 +93,7 @@
// }
//}
ap.deviceTopicRegistered = true
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -114,7 +114,7 @@
// Use a device specific topic as we are the only core handling requests for this device
//replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -130,7 +130,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -146,7 +146,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -162,7 +162,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
// We no longer need to have this device topic as we won't receive any unsolicited messages on it
@@ -184,7 +184,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
if success {
unpackResult := &ic.SwitchCapability{}
@@ -220,7 +220,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
if success {
unpackResult := &ic.PortCapability{}
@@ -288,7 +288,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -309,7 +309,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
if success {
@@ -346,7 +346,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -367,7 +367,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -388,7 +388,7 @@
}
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
@@ -422,7 +422,7 @@
// TODO: Do we need to wait for an ACK on a packet Out?
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
@@ -447,7 +447,7 @@
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}
@@ -472,7 +472,7 @@
// Use a device specific topic as we are the only core handling requests for this device
replyToTopic := ap.getCoreTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
return unPackResponse(rpc, device.Id, success, result)
}