VOL-1537 : Create the Alarm Framework in golang openolt adapter
Event manager is added to process indications comming from the
OLT and publish them as generic events on to the KAFKA bus which
could be device alarms or KPIs.
It depends on the updated events.proto which contains the defination
for the generic event gRPC message and the event proxy from the VOLTHA
core.
So the changes in voltha-proto needs to be merged first followed by the
changes in voltha-go and then voltha-openolt-adapter.
Change-Id: Ie38b2ea01bd738737522c018e65e685ee41589d5
diff --git a/Gopkg.lock b/Gopkg.lock
index fa5e5a3..3303ed1 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -235,7 +235,7 @@
[[projects]]
branch = "master"
- digest = "1:6b91ad0847f5e2ff260d4867b6fa723f6b80f797f87f49ce185d083c4f801289"
+ digest = "1:8813723b4cd2a23cddb4a2ed14f1949dc11017562ebf2a41903310956a8cb1a0"
name = "github.com/opencord/voltha-go"
packages = [
"adapters",
@@ -250,7 +250,7 @@
"rw_core/utils",
]
pruneopts = "UT"
- revision = "6deaa24a2a5bee6d9fd285ccb39b12f7255ee0ab"
+ revision = "44e134a1c07a5238985f816a3a127853cfd9efd2"
[[projects]]
branch = "master"
@@ -288,7 +288,7 @@
[[projects]]
branch = "master"
- digest = "1:d0ff4ba70cda3f93b1307bf1fe280241aa7584c448a5e2a27325620073fc6508"
+ digest = "1:ab16e5cb98bf2b6efcfd927e9cb737b7243014600ebcfbe8786f2e4ad2227ac1"
name = "go.etcd.io/etcd"
packages = [
"auth/authpb",
@@ -304,12 +304,13 @@
"pkg/systemd",
"pkg/types",
"raft",
+ "raft/confchange",
"raft/quorum",
"raft/raftpb",
"raft/tracker",
]
pruneopts = "UT"
- revision = "5a734e79f501565e8bb1ae7a7a9abcac7ae8c36d"
+ revision = "f498392ca712f854566361e1107d7211adc392b9"
[[projects]]
digest = "1:a5158647b553c61877aa9ae74f4015000294e47981e6b8b07525edcbb0747c81"
@@ -413,7 +414,7 @@
"googleapis/rpc/status",
]
pruneopts = "UT"
- revision = "3bdd9d9f5532d75d09efb230bd767d265245cfe5"
+ revision = "c506a9f9061087022822e8da603a52fc387115a8"
[[projects]]
digest = "1:931727c62baa7a9958acac5d712ae9cc0ea3ff86a664b560febf4f4b0677ee28"
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index fef55bf..20701ab 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -51,6 +51,7 @@
device *voltha.Device
coreProxy *com.CoreProxy
AdapterProxy *com.AdapterProxy
+ EventProxy *com.EventProxy
openOLT *OpenOLT
exitChannel chan int
lockDevice sync.RWMutex
@@ -58,6 +59,7 @@
transitionMap *TransitionMap
clientCon *grpc.ClientConn
flowMgr *OpenOltFlowMgr
+ eventMgr *OpenOltEventMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
discOnus map[string]bool
onus map[string]*OnuDevice
@@ -87,10 +89,11 @@
}
//NewDeviceHandler creates a new device handler
-func NewDeviceHandler(cp *com.CoreProxy, ap *com.AdapterProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
+func NewDeviceHandler(cp *com.CoreProxy, ap *com.AdapterProxy, ep *com.EventProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
var dh DeviceHandler
dh.coreProxy = cp
dh.AdapterProxy = ap
+ dh.EventProxy = ep
cloned := (proto.Clone(device)).(*voltha.Device)
dh.deviceID = cloned.Id
dh.deviceType = cloned.Type
@@ -254,6 +257,7 @@
}
func (dh *DeviceHandler) handleIndication(indication *oop.Indication) {
+ raisedTs := time.Now().UnixNano()
switch indication.Data.(type) {
case *oop.Indication_OltInd:
dh.handleOltIndication(indication.GetOltInd())
@@ -298,6 +302,8 @@
case *oop.Indication_AlarmInd:
alarmInd := indication.GetAlarmInd()
log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
+ dh.eventMgr.ProcessEvents(alarmInd, dh.deviceID, raisedTs)
+
}
}
@@ -463,6 +469,8 @@
return errors.New("instantiating flow manager failed")
}
/* TODO: Instantiate Alarm , stats , BW managers */
+ /* Instantiating Event Manager to handle Alarms and KPIs */
+ dh.eventMgr = NewEventMgr(dh.EventProxy)
// Start reading indications
go dh.readIndications()
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
index ce501ef..abbc4e3 100644
--- a/adaptercore/openolt.go
+++ b/adaptercore/openolt.go
@@ -36,6 +36,7 @@
deviceHandlers map[string]*DeviceHandler
coreProxy *com.CoreProxy
adapterProxy *com.AdapterProxy
+ eventProxy *com.EventProxy
kafkaICProxy *kafka.InterContainerProxy
numOnus int
KVStoreHost string
@@ -46,7 +47,7 @@
}
//NewOpenOLT returns a new instance of OpenOLT
-func NewOpenOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy, adapterProxy *com.AdapterProxy, onuNumber int, kvStoreHost string, kvStorePort int, KVStoreType string) *OpenOLT {
+func NewOpenOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy, adapterProxy *com.AdapterProxy, eventProxy *com.EventProxy, onuNumber int, kvStoreHost string, kvStorePort int, KVStoreType string) *OpenOLT {
var openOLT OpenOLT
openOLT.exitChannel = make(chan int, 1)
openOLT.deviceHandlers = make(map[string]*DeviceHandler)
@@ -54,6 +55,7 @@
openOLT.numOnus = onuNumber
openOLT.coreProxy = coreProxy
openOLT.adapterProxy = adapterProxy
+ openOLT.eventProxy = eventProxy
openOLT.KVStoreHost = kvStoreHost
openOLT.KVStorePort = kvStorePort
openOLT.KVStoreType = KVStoreType
@@ -132,7 +134,7 @@
log.Infow("adopt-device", log.Fields{"deviceId": device.Id})
var handler *DeviceHandler
if handler = oo.getDeviceHandler(device.Id); handler == nil {
- handler := NewDeviceHandler(oo.coreProxy, oo.adapterProxy, device, oo)
+ handler := NewDeviceHandler(oo.coreProxy, oo.adapterProxy, oo.eventProxy, device, oo)
oo.addDeviceHandlerToMap(handler)
go handler.AdoptDevice(device)
// Launch the creation of the device topic
diff --git a/adaptercore/openolt_eventmgr.go b/adaptercore/openolt_eventmgr.go
new file mode 100644
index 0000000..b993aea
--- /dev/null
+++ b/adaptercore/openolt_eventmgr.go
@@ -0,0 +1,315 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package adaptercore provides APIs for the openOLT adapter
+package adaptercore
+
+import (
+ "fmt"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ oop "github.com/opencord/voltha-protos/go/openolt"
+ "github.com/opencord/voltha-protos/go/voltha"
+)
+
+const (
+ onuDiscoveryEvent = "ONU_DISCOVERY"
+ onuLosEvent = "ONU_LOSS_OF_SIGNAL"
+ onuLobEvent = "ONU_LOSS_OF_BURST"
+ onuLopcMissEvent = "ONU_LOPC_MISS"
+ onuLopcMicErrorEvent = "ONU_LOPC_MIC_ERROR"
+ oltLosEvent = "OLT_LOSS_OF_SIGNAL"
+ onuDyingGaspEvent = "ONU_DYING_GASP"
+ onuSignalsFailEvent = "ONU_SIGNALS_FAIL"
+ onuStartupFailEvent = "ONU_STARTUP_FAIL"
+ onuSignalDegradeEvent = "ONU_SIGNAL_DEGRADE"
+ onuDriftOfWindowEvent = "ONU_DRIFT_OF_WINDOW"
+ onuActivationFailEvent = "ONU_ACTIVATION_FAIL"
+ onuProcessingErrorEvent = "ONU_PROCESSING_ERROR"
+ onuTiwiEvent = "ONU_TRANSMISSION_WARNING"
+ onuLossOmciEvent = "ONU_LOSS_OF_OMCI_CHANNEL"
+)
+
+const (
+ pon = voltha.EventSubCategory_PON
+ olt = voltha.EventSubCategory_OLT
+ ont = voltha.EventSubCategory_ONT
+ onu = voltha.EventSubCategory_ONU
+ nni = voltha.EventSubCategory_NNI
+ service = voltha.EventCategory_SERVICE
+ security = voltha.EventCategory_SECURITY
+ equipment = voltha.EventCategory_EQUIPMENT
+ processing = voltha.EventCategory_PROCESSING
+ environment = voltha.EventCategory_ENVIRONMENT
+ communication = voltha.EventCategory_COMMUNICATION
+)
+
+// OpenOltEventMgr struct contains
+type OpenOltEventMgr struct {
+ eventProxy *com.EventProxy
+}
+
+// NewEventMgr is a Function to get a new event manager struct for the OpenOLT to process and publish OpenOLT event
+func NewEventMgr(eventProxy *com.EventProxy) *OpenOltEventMgr {
+ var em OpenOltEventMgr
+ em.eventProxy = eventProxy
+ return &em
+}
+
+// ProcessEvents is function to process and publish OpenOLT event
+func (em *OpenOltEventMgr) ProcessEvents(alarmInd *oop.AlarmIndication, deviceID string, raisedTs int64) {
+
+ switch alarmInd.Data.(type) {
+ case *oop.AlarmIndication_LosInd:
+ log.Infow("Received LOS indication", log.Fields{"alarm_ind": alarmInd})
+ em.oltLosIndication(alarmInd.GetLosInd(), deviceID, raisedTs)
+
+ case *oop.AlarmIndication_OnuAlarmInd:
+ log.Infow("Received onu alarm indication ", log.Fields{"alarm_ind": alarmInd})
+ em.onuAlarmIndication(alarmInd.GetOnuAlarmInd(), deviceID, raisedTs)
+
+ case *oop.AlarmIndication_DyingGaspInd:
+ log.Infow("Received dying gasp indication", log.Fields{"alarm_ind": alarmInd})
+ em.onuDyingGaspIndication(alarmInd.GetDyingGaspInd(), deviceID, raisedTs)
+
+ case *oop.AlarmIndication_OnuActivationFailInd:
+ log.Infow("Received onu activation fail indication ", log.Fields{"alarm_ind": alarmInd})
+ em.onuActivationFailIndication(alarmInd.GetOnuActivationFailInd(), deviceID, raisedTs)
+
+ case *oop.AlarmIndication_OnuLossOmciInd:
+ log.Infow("Received onu loss omci indication ", log.Fields{"alarm_ind": alarmInd})
+ em.onuLossOmciIndication(alarmInd.GetOnuLossOmciInd(), deviceID, raisedTs)
+
+ case *oop.AlarmIndication_OnuDriftOfWindowInd:
+ log.Infow("Received onu drift of window indication ", log.Fields{"alarm_ind": alarmInd})
+ em.onuDriftOfWindowIndication(alarmInd.GetOnuDriftOfWindowInd(), deviceID, raisedTs)
+
+ case *oop.AlarmIndication_OnuSignalDegradeInd:
+ log.Infow("Received onu signal degrade indication ", log.Fields{"alarm_ind": alarmInd})
+ em.onuSignalDegradeIndication(alarmInd.GetOnuSignalDegradeInd(), deviceID, raisedTs)
+
+ case *oop.AlarmIndication_OnuSignalsFailInd:
+ log.Infow("Received onu signal fail indication ", log.Fields{"alarm_ind": alarmInd})
+ em.onuSignalsFailIndication(alarmInd.GetOnuSignalsFailInd(), deviceID, raisedTs)
+
+ case *oop.AlarmIndication_OnuProcessingErrorInd:
+ log.Infow("Received onu startup fail indication ", log.Fields{"alarm_ind": alarmInd})
+ log.Infow("Not implemented yet", log.Fields{"alarm_ind": alarmInd})
+ case *oop.AlarmIndication_OnuTiwiInd:
+ log.Infow("Received onu transmission warning indication ", log.Fields{"alarm_ind": alarmInd})
+ log.Infow("Not implemented yet", log.Fields{"alarm_ind": "Onu-Transmission-indication"})
+ default:
+ log.Errorw("Received unknown indication type", log.Fields{"alarm_ind": alarmInd})
+
+ }
+}
+
+// OnuDiscoveryIndication is an exported method to handle ONU discovery event
+func (em *OpenOltEventMgr) OnuDiscoveryIndication(onuDisc *oop.OnuDiscIndication, deviceID string, OnuID uint32, serialNumber string, raisedTs int64) {
+ var de voltha.DeviceEvent
+ context := make(map[string]string)
+ /* Populating event context */
+ context["onu-id"] = string(OnuID)
+ context["intf-id"] = string(onuDisc.IntfId)
+ context["serial-number"] = serialNumber
+ /* Populating device event body */
+ de.Context = context
+ de.ResourceId = deviceID
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuDiscoveryEvent, "RAISE_EVENT")
+ /* Send event to KAFKA */
+ if err := em.eventProxy.SendDeviceEvent(&de, equipment, pon, raisedTs); err != nil {
+ log.Errorw("Failed to send ONU discovery event", log.Fields{"serial-number": serialNumber, "intf-id": onuDisc.IntfId})
+ }
+ log.Infow("ONU discovery event sent to KAFKA", log.Fields{"serial-number": serialNumber, "intf-id": onuDisc.IntfId})
+}
+
+func (em *OpenOltEventMgr) oltLosIndication(oltLos *oop.LosIndication, deviceID string, raisedTs int64) {
+ var de voltha.DeviceEvent
+ context := make(map[string]string)
+ /* Populating event context */
+ context["intf-id"] = string(oltLos.IntfId)
+ /* Populating device event body */
+ de.Context = context
+ de.ResourceId = deviceID
+ if oltLos.Status == "on" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", oltLosEvent, "RAISE_EVENT")
+ } else {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", oltLosEvent, "CLEAR_EVENT")
+ }
+ /* Send event to KAFKA */
+ if err := em.eventProxy.SendDeviceEvent(&de, communication, olt, raisedTs); err != nil {
+ log.Errorw("Failed to send OLT loss of signal event", log.Fields{"intf-id": oltLos.IntfId})
+ }
+ log.Infow("OLT LOS event sent to KAFKA", log.Fields{"intf-id": oltLos.IntfId})
+}
+
+func (em *OpenOltEventMgr) onuDyingGaspIndication(dgi *oop.DyingGaspIndication, deviceID string, raisedTs int64) {
+ var de voltha.DeviceEvent
+ context := make(map[string]string)
+ /* Populating event context */
+ context["intf-id"] = string(dgi.IntfId)
+ context["onu-id"] = string(dgi.OnuId)
+ /* Populating device event body */
+ de.Context = context
+ de.ResourceId = deviceID
+ if dgi.Status == "on" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuDyingGaspEvent, "RAISE_EVENT")
+ } else {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuDyingGaspEvent, "CLEAR_EVENT")
+ }
+ /* Send event to KAFKA */
+ if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+ log.Errorw("Failed to send ONU Dying gasp event", log.Fields{"intf-id": dgi.IntfId, "onu-id": dgi.OnuId})
+ }
+ log.Infow("ONU dying gasp event sent to KAFKA", log.Fields{"intf-id": dgi.IntfId})
+}
+
+func (em *OpenOltEventMgr) onuAlarmIndication(onuAlarm *oop.OnuAlarmIndication, deviceID string, raisedTs int64) {
+ var de voltha.DeviceEvent
+ context := make(map[string]string)
+ /* Populating event context */
+ context["intf-id"] = string(onuAlarm.IntfId)
+ context["onu-id"] = string(onuAlarm.OnuId)
+ /* Populating device event body */
+ de.Context = context
+ de.ResourceId = deviceID
+ if onuAlarm.LosStatus == "on" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuLosEvent, "RAISE_EVENT")
+ } else if onuAlarm.LosStatus == "off" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuLosEvent, "CLEAR_EVENT")
+ } else if onuAlarm.LobStatus == "on" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuLobEvent, "RAISE_EVENT")
+ } else if onuAlarm.LobStatus == "off" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuLobEvent, "CLEAR_EVENT")
+ } else if onuAlarm.LopcMissStatus == "on" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuLopcMissEvent, "RAISE_EVENT")
+ } else if onuAlarm.LopcMissStatus == "off" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuLopcMissEvent, "CLEAR_EVENT")
+ } else if onuAlarm.LopcMicErrorStatus == "on" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuLopcMicErrorEvent, "RAISE_EVENT")
+ } else if onuAlarm.LopcMicErrorStatus == "off" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuLopcMicErrorEvent, "CLEAR_EVENT")
+ }
+ /* Send event to KAFKA */
+ if err := em.eventProxy.SendDeviceEvent(&de, communication, onu, raisedTs); err != nil {
+ log.Errorw("Failed to send ONU Los event", log.Fields{"onu-id": onuAlarm.OnuId, "intf-id": onuAlarm.IntfId})
+ }
+ log.Infow("ONU LOS event sent to KAFKA", log.Fields{"onu-id": onuAlarm.OnuId, "intf-id": onuAlarm.IntfId})
+}
+
+func (em *OpenOltEventMgr) onuActivationFailIndication(oaf *oop.OnuActivationFailureIndication, deviceID string, raisedTs int64) {
+ var de voltha.DeviceEvent
+ context := make(map[string]string)
+ /* Populating event context */
+ context["intf-id"] = string(oaf.IntfId)
+ context["onu-id"] = string(oaf.OnuId)
+ /* Populating device event body */
+ de.Context = context
+ de.ResourceId = deviceID
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuActivationFailEvent, "RAISE_EVENT")
+ /* Send event to KAFKA */
+ if err := em.eventProxy.SendDeviceEvent(&de, equipment, pon, raisedTs); err != nil {
+ log.Errorw("Failed to send ONU activation failure event", log.Fields{"onu-id": oaf.OnuId, "intf-id": oaf.IntfId})
+ }
+ log.Infow("ONU activation failure event sent to KAFKA", log.Fields{"onu-id": oaf.OnuId, "intf-id": oaf.IntfId})
+}
+
+func (em *OpenOltEventMgr) onuLossOmciIndication(onuLossOmci *oop.OnuLossOfOmciChannelIndication, deviceID string, raisedTs int64) {
+ var de voltha.DeviceEvent
+ context := make(map[string]string)
+ /* Populating event context */
+ context["intf-id"] = string(onuLossOmci.IntfId)
+ context["onu-id"] = string(onuLossOmci.OnuId)
+ /* Populating device event body */
+ de.Context = context
+ de.ResourceId = deviceID
+ if onuLossOmci.Status == "on" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOmciEvent, "RAISE_EVENT")
+ } else {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOmciEvent, "CLEAR_EVENT")
+ }
+ /* Send event to KAFKA */
+ if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+ log.Errorw("Failed to send ONU loss of OMCI channel event", log.Fields{"onu-id": onuLossOmci.OnuId, "intf-id": onuLossOmci.IntfId})
+ }
+ log.Infow("ONU loss of OMCI channel event sent to KAFKA", log.Fields{"onu-id": onuLossOmci.OnuId, "intf-id": onuLossOmci.IntfId})
+}
+
+func (em *OpenOltEventMgr) onuDriftOfWindowIndication(onuDriftWindow *oop.OnuDriftOfWindowIndication, deviceID string, raisedTs int64) {
+ var de voltha.DeviceEvent
+ context := make(map[string]string)
+ /* Populating event context */
+ context["intf-id"] = string(onuDriftWindow.IntfId)
+ context["onu-id"] = string(onuDriftWindow.OnuId)
+ context["drift"] = string(onuDriftWindow.OnuId)
+ context["new-eqd"] = string(onuDriftWindow.OnuId)
+ /* Populating device event body */
+ de.Context = context
+ de.ResourceId = deviceID
+ if onuDriftWindow.Status == "on" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuDriftOfWindowEvent, "RAISE_EVENT")
+ } else {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuDriftOfWindowEvent, "CLEAR_EVENT")
+ }
+ /* Send event to KAFKA */
+ if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+ log.Errorw("Failed to send ONU drift of window event", log.Fields{"onu-id": onuDriftWindow.OnuId, "intf-id": onuDriftWindow.IntfId})
+ }
+ log.Infow("ONU drift of window event sent to KAFKA", log.Fields{"onu-id": onuDriftWindow.OnuId, "intf-id": onuDriftWindow.IntfId})
+}
+
+func (em *OpenOltEventMgr) onuSignalDegradeIndication(onuSignalDegrade *oop.OnuSignalDegradeIndication, deviceID string, raisedTs int64) {
+ var de voltha.DeviceEvent
+ context := make(map[string]string)
+ /* Populating event context */
+ context["intf-id"] = string(onuSignalDegrade.IntfId)
+ context["onu-id"] = string(onuSignalDegrade.OnuId)
+ /* Populating device event body */
+ de.Context = context
+ de.ResourceId = deviceID
+ if onuSignalDegrade.Status == "on" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuSignalDegradeEvent, "RAISE_EVENT")
+ } else {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuSignalDegradeEvent, "CLEAR_EVENT")
+ }
+ /* Send event to KAFKA */
+ if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+ log.Errorw("Failed to send ONU signals degrade event", log.Fields{"onu-id": onuSignalDegrade.OnuId, "intf-id": onuSignalDegrade.IntfId})
+ }
+ log.Infow("ONU signal degrade event sent to KAFKA", log.Fields{"onu-id": onuSignalDegrade.OnuId, "intf-id": onuSignalDegrade.IntfId})
+}
+
+func (em *OpenOltEventMgr) onuSignalsFailIndication(onuSignalsFail *oop.OnuSignalsFailureIndication, deviceID string, raisedTs int64) {
+ var de voltha.DeviceEvent
+ context := make(map[string]string)
+ /* Populating event context */
+ context["onu-id"] = string(onuSignalsFail.OnuId)
+ context["intf-id"] = string(onuSignalsFail.IntfId)
+ context["inverse-bit-error-rate"] = string(onuSignalsFail.InverseBitErrorRate)
+ /* Populating device event body */
+ de.Context = context
+ de.ResourceId = deviceID
+ if onuSignalsFail.Status == "on" {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuSignalsFailEvent, "RAISE_EVENT")
+ } else {
+ de.DeviceEventName = fmt.Sprintf("%s_%s", onuSignalsFailEvent, "CLEAR_EVENT")
+ }
+ /* Send event to KAFKA */
+ if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+ log.Errorw("Failed to send ONU signals fail event", log.Fields{"onu-id": onuSignalsFail.OnuId, "intf-id": onuSignalsFail.IntfId})
+ }
+ log.Infow("ONU signals fail event sent to KAFKA", log.Fields{"onu-id": onuSignalsFail.OnuId, "intf-id": onuSignalsFail.IntfId})
+}
diff --git a/config/config.go b/config/config.go
index 17d270b..5850a03 100644
--- a/config/config.go
+++ b/config/config.go
@@ -41,6 +41,7 @@
defaultDisplayVersionOnly = false
defaultTopic = "openolt"
defaultCoretopic = "rwcore"
+ defaultEventtopic = "voltha.events"
defaultOnunumber = 1
)
@@ -58,6 +59,7 @@
KVStorePort int
Topic string
CoreTopic string
+ EventTopic string
LogLevel int
OnuNumber int
Banner bool
@@ -82,6 +84,7 @@
KVStorePort: defaultKvstoreport,
Topic: defaultTopic,
CoreTopic: defaultCoretopic,
+ EventTopic: defaultEventtopic,
LogLevel: defaultLoglevel,
OnuNumber: defaultOnunumber,
Banner: defaultBanner,
@@ -111,6 +114,9 @@
help = fmt.Sprintf("Core topic")
flag.StringVar(&(so.CoreTopic), "core_topic", defaultCoretopic, help)
+ help = fmt.Sprintf("Event topic")
+ flag.StringVar(&(so.EventTopic), "event_topic", defaultEventtopic, help)
+
help = fmt.Sprintf("KV store type")
flag.StringVar(&(so.KVStoreType), "kv_store_type", defaultKvstoretype, help)
diff --git a/main.go b/main.go
index dbc2756..3a0e966 100644
--- a/main.go
+++ b/main.go
@@ -47,6 +47,7 @@
kip *kafka.InterContainerProxy
coreProxy *com.CoreProxy
adapterProxy *com.AdapterProxy
+ eventProxy *com.EventProxy
halted bool
exitChannel chan int
receiverChannels []<-chan *ic.InterContainerMessage
@@ -92,8 +93,11 @@
// Create the adaptor proxy to handle request between olt and onu
a.adapterProxy = com.NewAdapterProxy(a.kip, "brcm_openomci_onu", a.config.CoreTopic)
+ // Create the event proxy to post events to KAFKA
+ a.eventProxy = com.NewEventProxy(com.MsgClient(a.kafkaClient), com.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
+
// Create the open OLT adapter
- if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.adapterProxy, a.config.OnuNumber,
+ if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy, a.config.OnuNumber,
a.config.KVStoreHost, a.config.KVStorePort, a.config.KVStoreType); err != nil {
log.Fatal("error-starting-inter-container-proxy")
}
@@ -203,10 +207,10 @@
return kip, nil
}
-func (a *adapter) startOpenOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, ap *com.AdapterProxy, onuNumber int, kvStoreHost string, kvStorePort int, KVStoreType string) (*ac.OpenOLT, error) {
+func (a *adapter) startOpenOLT(ctx context.Context, kip *kafka.InterContainerProxy, cp *com.CoreProxy, ap *com.AdapterProxy, ep *com.EventProxy, onuNumber int, kvStoreHost string, kvStorePort int, KVStoreType string) (*ac.OpenOLT, error) {
log.Info("starting-open-olt")
var err error
- sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, onuNumber, kvStoreHost, kvStorePort, KVStoreType)
+ sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, ep, onuNumber, kvStoreHost, kvStorePort, KVStoreType)
if err = sOLT.Start(ctx); err != nil {
log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/events_proxy.go b/vendor/github.com/opencord/voltha-go/adapters/common/events_proxy.go
new file mode 100644
index 0000000..1f14b3a
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/events_proxy.go
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "errors"
+ "fmt"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/kafka"
+ "github.com/opencord/voltha-protos/go/voltha"
+ "strconv"
+ "strings"
+ "time"
+)
+
+const (
+ EventTypeVersion = "0.1"
+)
+
+type (
+ EventType = voltha.EventType_EventType
+ EventCategory = voltha.EventCategory_EventCategory
+ EventSubCategory = voltha.EventSubCategory_EventSubCategory
+)
+
+type EventProxy struct {
+ kafkaClient kafka.Client
+ eventTopic kafka.Topic
+}
+
+func NewEventProxy(opts ...EventProxyOption) *EventProxy {
+ var proxy EventProxy
+ for _, option := range opts {
+ option(&proxy)
+ }
+ return &proxy
+}
+
+type EventProxyOption func(*EventProxy)
+
+func MsgClient(client kafka.Client) EventProxyOption {
+ return func(args *EventProxy) {
+ args.kafkaClient = client
+ }
+}
+
+func MsgTopic(topic kafka.Topic) EventProxyOption {
+ return func(args *EventProxy) {
+ args.eventTopic = topic
+ }
+}
+
+func (ep *EventProxy) formatId(eventName string) string {
+ return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
+}
+
+func (ep *EventProxy) getEventHeader(eventName string, category EventCategory, subCategory EventSubCategory, eventType EventType, raisedTs int64) *voltha.EventHeader {
+ var header voltha.EventHeader
+ if strings.Contains(eventName, "_") {
+ eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
+ } else {
+ eventName = "UNKNOWN_EVENT"
+ }
+ /* Populating event header */
+ header.Id = ep.formatId(eventName)
+ header.Category = category
+ header.SubCategory = subCategory
+ header.Type = eventType
+ header.TypeVersion = EventTypeVersion
+ header.RaisedTs = float32(raisedTs)
+ header.ReportedTs = float32(time.Now().UnixNano())
+ return &header
+}
+
+/* Send out device events*/
+func (ep *EventProxy) SendDeviceEvent(deviceEvent *voltha.DeviceEvent, category EventCategory, subCategory EventSubCategory, raisedTs int64) error {
+ if deviceEvent == nil {
+ log.Error("Recieved empty device event")
+ return errors.New("Device event nil")
+ }
+ var event voltha.Event
+ var de voltha.Event_DeviceEvent
+ de.DeviceEvent = deviceEvent
+ event.Header = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs)
+ event.EventType = &de
+ if err := ep.sendEvent(&event); err != nil {
+ log.Errorw("Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
+ return err
+ }
+ log.Infow("Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
+ "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
+ "DeviceEventName": deviceEvent.DeviceEventName})
+
+ return nil
+
+}
+
+/* TODO: Send out KPI events*/
+
+func (ep *EventProxy) sendEvent(event *voltha.Event) error {
+ if err := ep.kafkaClient.Send(event, &ep.eventTopic); err != nil {
+ return err
+ }
+ log.Debugw("Sent event to kafka", log.Fields{"event": event})
+
+ return nil
+}
diff --git a/vendor/go.etcd.io/etcd/raft/confchange/confchange.go b/vendor/go.etcd.io/etcd/raft/confchange/confchange.go
new file mode 100644
index 0000000..fd75aed
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/confchange/confchange.go
@@ -0,0 +1,420 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package confchange
+
+import (
+ "errors"
+ "fmt"
+ "strings"
+
+ "go.etcd.io/etcd/raft/quorum"
+ pb "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/raft/tracker"
+)
+
+// Changer facilitates configuration changes. It exposes methods to handle
+// simple and joint consensus while performing the proper validation that allows
+// refusing invalid configuration changes before they affect the active
+// configuration.
+type Changer struct {
+ Tracker tracker.ProgressTracker
+ LastIndex uint64
+}
+
+// EnterJoint verifies that the outgoing (=right) majority config of the joint
+// config is empty and initializes it with a copy of the incoming (=left)
+// majority config. That is, it transitions from
+//
+// (1 2 3)&&()
+// to
+// (1 2 3)&&(1 2 3).
+//
+// The supplied ConfChanges are then applied to the incoming majority config,
+// resulting in a joint configuration that in terms of the Raft thesis[1]
+// (Section 4.3) corresponds to `C_{new,old}`.
+//
+// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
+func (c Changer) EnterJoint(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) {
+ cfg, prs, err := c.checkAndCopy()
+ if err != nil {
+ return c.err(err)
+ }
+ if joint(cfg) {
+ err := errors.New("config is already joint")
+ return c.err(err)
+ }
+ if len(incoming(cfg.Voters)) == 0 {
+ // We allow adding nodes to an empty config for convenience (testing and
+ // bootstrap), but you can't enter a joint state.
+ err := errors.New("can't make a zero-voter config joint")
+ return c.err(err)
+ }
+ // Clear the outgoing config.
+ {
+ *outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{}
+
+ }
+ // Copy incoming to outgoing.
+ for id := range incoming(cfg.Voters) {
+ outgoing(cfg.Voters)[id] = struct{}{}
+ }
+
+ if err := c.apply(&cfg, prs, ccs...); err != nil {
+ return c.err(err)
+ }
+
+ return checkAndReturn(cfg, prs)
+}
+
+// LeaveJoint transitions out of a joint configuration. It is an error to call
+// this method if the configuration is not joint, i.e. if the outgoing majority
+// config Voters[1] is empty.
+//
+// The outgoing majority config of the joint configuration will be removed,
+// that is, the incoming config is promoted as the sole decision maker. In the
+// notation of the Raft thesis[1] (Section 4.3), this method transitions from
+// `C_{new,old}` into `C_new`.
+//
+// At the same time, any staged learners (LearnersNext) the addition of which
+// was held back by an overlapping voter in the former outgoing config will be
+// inserted into Learners.
+//
+// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
+func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
+ cfg, prs, err := c.checkAndCopy()
+ if err != nil {
+ return c.err(err)
+ }
+ if !joint(cfg) {
+ err := errors.New("can't leave a non-joint config")
+ return c.err(err)
+ }
+ if len(outgoing(cfg.Voters)) == 0 {
+ err := fmt.Errorf("configuration is not joint: %v", cfg)
+ return c.err(err)
+ }
+ for id := range cfg.LearnersNext {
+ nilAwareAdd(&cfg.Learners, id)
+ prs[id].IsLearner = true
+ }
+ cfg.LearnersNext = nil
+
+ for id := range outgoing(cfg.Voters) {
+ _, isVoter := incoming(cfg.Voters)[id]
+ _, isLearner := cfg.Learners[id]
+
+ if !isVoter && !isLearner {
+ delete(prs, id)
+ }
+ }
+ *outgoingPtr(&cfg.Voters) = nil
+
+ return checkAndReturn(cfg, prs)
+}
+
+// Simple carries out a series of configuration changes that (in aggregate)
+// mutates the incoming majority config Voters[0] by at most one. This method
+// will return an error if that is not the case, if the resulting quorum is
+// zero, or if the configuration is in a joint state (i.e. if there is an
+// outgoing configuration).
+func (c Changer) Simple(ccs ...pb.ConfChange) (tracker.Config, tracker.ProgressMap, error) {
+ cfg, prs, err := c.checkAndCopy()
+ if err != nil {
+ return c.err(err)
+ }
+ if joint(cfg) {
+ err := errors.New("can't apply simple config change in joint config")
+ return c.err(err)
+ }
+ if err := c.apply(&cfg, prs, ccs...); err != nil {
+ return c.err(err)
+ }
+ if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
+ return tracker.Config{}, nil, errors.New("more than voter changed without entering joint config")
+ }
+ if err := checkInvariants(cfg, prs); err != nil {
+ return tracker.Config{}, tracker.ProgressMap{}, nil
+ }
+
+ return checkAndReturn(cfg, prs)
+}
+
+// apply a ConfChange to the configuration. By convention, changes to voters are
+// always made to the incoming majority config Voters[0]. Voters[1] is either
+// empty or preserves the outgoing majority configuration while in a joint state.
+func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChange) error {
+ for _, cc := range ccs {
+ if cc.NodeID == 0 {
+ // etcd replaces the NodeID with zero if it decides (downstream of
+ // raft) to not apply a ConfChange, so we have to have explicit code
+ // here to ignore these.
+ continue
+ }
+ switch cc.Type {
+ case pb.ConfChangeAddNode:
+ c.makeVoter(cfg, prs, cc.NodeID)
+ case pb.ConfChangeAddLearnerNode:
+ c.makeLearner(cfg, prs, cc.NodeID)
+ case pb.ConfChangeRemoveNode:
+ c.remove(cfg, prs, cc.NodeID)
+ case pb.ConfChangeUpdateNode:
+ default:
+ return fmt.Errorf("unexpected conf type %d", cc.Type)
+ }
+ }
+ if len(incoming(cfg.Voters)) == 0 {
+ return errors.New("removed all voters")
+ }
+ return nil
+}
+
+// makeVoter adds or promotes the given ID to be a voter in the incoming
+// majority config.
+func (c Changer) makeVoter(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
+ pr := prs[id]
+ if pr == nil {
+ c.initProgress(cfg, prs, id, false /* isLearner */)
+ return
+ }
+
+ pr.IsLearner = false
+ nilAwareDelete(&cfg.Learners, id)
+ nilAwareDelete(&cfg.LearnersNext, id)
+ incoming(cfg.Voters)[id] = struct{}{}
+ return
+}
+
+// makeLearner makes the given ID a learner or stages it to be a learner once
+// an active joint configuration is exited.
+//
+// The former happens when the peer is not a part of the outgoing config, in
+// which case we either add a new learner or demote a voter in the incoming
+// config.
+//
+// The latter case occurs when the configuration is joint and the peer is a
+// voter in the outgoing config. In that case, we do not want to add the peer
+// as a learner because then we'd have to track a peer as a voter and learner
+// simultaneously. Instead, we add the learner to LearnersNext, so that it will
+// be added to Learners the moment the outgoing config is removed by
+// LeaveJoint().
+func (c Changer) makeLearner(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
+ pr := prs[id]
+ if pr == nil {
+ c.initProgress(cfg, prs, id, true /* isLearner */)
+ return
+ }
+ if pr.IsLearner {
+ return
+ }
+ // Remove any existing voter in the incoming config...
+ c.remove(cfg, prs, id)
+ // ... but save the Progress.
+ prs[id] = pr
+ // Use LearnersNext if we can't add the learner to Learners directly, i.e.
+ // if the peer is still tracked as a voter in the outgoing config. It will
+ // be turned into a learner in LeaveJoint().
+ //
+ // Otherwise, add a regular learner right away.
+ if _, onRight := outgoing(cfg.Voters)[id]; onRight {
+ nilAwareAdd(&cfg.LearnersNext, id)
+ } else {
+ pr.IsLearner = true
+ nilAwareAdd(&cfg.Learners, id)
+ }
+}
+
+// remove this peer as a voter or learner from the incoming config.
+func (c Changer) remove(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
+ if _, ok := prs[id]; !ok {
+ return
+ }
+
+ delete(incoming(cfg.Voters), id)
+ nilAwareDelete(&cfg.Learners, id)
+ nilAwareDelete(&cfg.LearnersNext, id)
+
+ // If the peer is still a voter in the outgoing config, keep the Progress.
+ if _, onRight := outgoing(cfg.Voters)[id]; !onRight {
+ delete(prs, id)
+ }
+}
+
+// initProgress initializes a new progress for the given node or learner.
+func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id uint64, isLearner bool) {
+ if !isLearner {
+ incoming(cfg.Voters)[id] = struct{}{}
+ } else {
+ nilAwareAdd(&cfg.Learners, id)
+ }
+ prs[id] = &tracker.Progress{
+ // We initialize Progress.Next with lastIndex+1 so that the peer will be
+ // probed without an index first.
+ //
+ // TODO(tbg): verify that, this is just my best guess.
+ Next: c.LastIndex + 1,
+ Match: 0,
+ Inflights: tracker.NewInflights(c.Tracker.MaxInflight),
+ IsLearner: isLearner,
+ // When a node is first added, we should mark it as recently active.
+ // Otherwise, CheckQuorum may cause us to step down if it is invoked
+ // before the added node has had a chance to communicate with us.
+ RecentActive: true,
+ }
+}
+
+// checkInvariants makes sure that the config and progress are compatible with
+// each other. This is used to check both what the Changer is initialized with,
+// as well as what it returns.
+func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error {
+ // NB: intentionally allow the empty config. In production we'll never see a
+ // non-empty config (we prevent it from being created) but we will need to
+ // be able to *create* an initial config, for example during bootstrap (or
+ // during tests). Instead of having to hand-code this, we allow
+ // transitioning from an empty config into any other legal and non-empty
+ // config.
+ for _, ids := range []map[uint64]struct{}{
+ cfg.Voters.IDs(),
+ cfg.Learners,
+ cfg.LearnersNext,
+ } {
+ for id := range ids {
+ if _, ok := prs[id]; !ok {
+ return fmt.Errorf("no progress for %d", id)
+ }
+ }
+ }
+
+ // Any staged learner was staged because it could not be directly added due
+ // to a conflicting voter in the outgoing config.
+ for id := range cfg.LearnersNext {
+ if _, ok := outgoing(cfg.Voters)[id]; !ok {
+ return fmt.Errorf("%d is in LearnersNext, but not Voters[1]", id)
+ }
+ if prs[id].IsLearner {
+ return fmt.Errorf("%d is in LearnersNext, but is already marked as learner", id)
+ }
+ }
+ // Conversely Learners and Voters doesn't intersect at all.
+ for id := range cfg.Learners {
+ if _, ok := outgoing(cfg.Voters)[id]; ok {
+ return fmt.Errorf("%d is in Learners and Voters[1]", id)
+ }
+ if _, ok := incoming(cfg.Voters)[id]; ok {
+ return fmt.Errorf("%d is in Learners and Voters[0]", id)
+ }
+ if !prs[id].IsLearner {
+ return fmt.Errorf("%d is in Learners, but is not marked as learner", id)
+ }
+ }
+
+ if !joint(cfg) {
+ // We enforce that empty maps are nil instead of zero.
+ if outgoing(cfg.Voters) != nil {
+ return fmt.Errorf("Voters[1] must be nil when not joint")
+ }
+ if cfg.LearnersNext != nil {
+ return fmt.Errorf("LearnersNext must be nil when not joint")
+ }
+ }
+
+ return nil
+}
+
+// checkAndCopy copies the tracker's config and progress map (deeply enough for
+// the purposes of the Changer) and returns those copies. It returns an error
+// if checkInvariants does.
+func (c Changer) checkAndCopy() (tracker.Config, tracker.ProgressMap, error) {
+ cfg := c.Tracker.Config.Clone()
+ prs := tracker.ProgressMap{}
+
+ for id, pr := range c.Tracker.Progress {
+ // A shallow copy is enough because we only mutate the Learner field.
+ ppr := *pr
+ prs[id] = &ppr
+ }
+ return checkAndReturn(cfg, prs)
+}
+
+// checkAndReturn calls checkInvariants on the input and returns either the
+// resulting error or the input.
+func checkAndReturn(cfg tracker.Config, prs tracker.ProgressMap) (tracker.Config, tracker.ProgressMap, error) {
+ if err := checkInvariants(cfg, prs); err != nil {
+ return tracker.Config{}, tracker.ProgressMap{}, err
+ }
+ return cfg, prs, nil
+}
+
+// err returns zero values and an error.
+func (c Changer) err(err error) (tracker.Config, tracker.ProgressMap, error) {
+ return tracker.Config{}, nil, err
+}
+
+// nilAwareAdd populates a map entry, creating the map if necessary.
+func nilAwareAdd(m *map[uint64]struct{}, id uint64) {
+ if *m == nil {
+ *m = map[uint64]struct{}{}
+ }
+ (*m)[id] = struct{}{}
+}
+
+// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after.
+func nilAwareDelete(m *map[uint64]struct{}, id uint64) {
+ if *m == nil {
+ return
+ }
+ delete(*m, id)
+ if len(*m) == 0 {
+ *m = nil
+ }
+}
+
+// symdiff returns the count of the symmetric difference between the sets of
+// uint64s, i.e. len( (l - r) \union (r - l)).
+func symdiff(l, r map[uint64]struct{}) int {
+ var n int
+ pairs := [][2]quorum.MajorityConfig{
+ {l, r}, // count elems in l but not in r
+ {r, l}, // count elems in r but not in l
+ }
+ for _, p := range pairs {
+ for id := range p[0] {
+ if _, ok := p[1][id]; !ok {
+ n++
+ }
+ }
+ }
+ return n
+}
+
+func joint(cfg tracker.Config) bool {
+ return len(outgoing(cfg.Voters)) > 0
+}
+
+func incoming(voters quorum.JointConfig) quorum.MajorityConfig { return voters[0] }
+func outgoing(voters quorum.JointConfig) quorum.MajorityConfig { return voters[1] }
+func outgoingPtr(voters *quorum.JointConfig) *quorum.MajorityConfig { return &voters[1] }
+
+// Describe prints the type and NodeID of the configuration changes as a
+// space-delimited string.
+func Describe(ccs ...pb.ConfChange) string {
+ var buf strings.Builder
+ for _, cc := range ccs {
+ if buf.Len() > 0 {
+ buf.WriteByte(' ')
+ }
+ fmt.Fprintf(&buf, "%s(%d)", cc.Type, cc.NodeID)
+ }
+ return buf.String()
+}
diff --git a/vendor/go.etcd.io/etcd/raft/raft.go b/vendor/go.etcd.io/etcd/raft/raft.go
index 846ff49..01e23ec 100644
--- a/vendor/go.etcd.io/etcd/raft/raft.go
+++ b/vendor/go.etcd.io/etcd/raft/raft.go
@@ -24,6 +24,7 @@
"sync"
"time"
+ "go.etcd.io/etcd/raft/confchange"
"go.etcd.io/etcd/raft/quorum"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
@@ -356,15 +357,11 @@
}
for _, p := range peers {
// Add node to active config.
- r.prs.InitProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */)
+ r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: p})
}
for _, p := range learners {
// Add learner to active config.
- r.prs.InitProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */)
-
- if r.id == p {
- r.isLearner = true
- }
+ r.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddLearnerNode, NodeID: p})
}
if !isHardStateEqual(hs, emptyState) {
@@ -1401,55 +1398,15 @@
}
func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
- addNodeOrLearnerNode := func(id uint64, isLearner bool) {
- // NB: this method is intentionally hidden from view. All mutations of
- // the conf state must call applyConfChange directly.
- pr := r.prs.Progress[id]
- if pr == nil {
- r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
- } else {
- if isLearner && !pr.IsLearner {
- // Can only change Learner to Voter.
- //
- // TODO(tbg): why?
- r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
- return
- }
-
- if isLearner == pr.IsLearner {
- // Ignore any redundant addNode calls (which can happen because the
- // initial bootstrapping entries are applied twice).
- return
- }
-
- // Change Learner to Voter, use origin Learner progress.
- r.prs.RemoveAny(id)
- r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
- pr.IsLearner = false
- *r.prs.Progress[id] = *pr
- }
-
- // When a node is first added, we should mark it as recently active.
- // Otherwise, CheckQuorum may cause us to step down if it is invoked
- // before the added node has had a chance to communicate with us.
- r.prs.Progress[id].RecentActive = true
+ cfg, prs, err := confchange.Changer{
+ Tracker: r.prs,
+ LastIndex: r.raftLog.lastIndex(),
+ }.Simple(cc)
+ if err != nil {
+ panic(err)
}
-
- var removed int
- if cc.NodeID != None {
- switch cc.Type {
- case pb.ConfChangeAddNode:
- addNodeOrLearnerNode(cc.NodeID, false /* isLearner */)
- case pb.ConfChangeAddLearnerNode:
- addNodeOrLearnerNode(cc.NodeID, true /* isLearner */)
- case pb.ConfChangeRemoveNode:
- removed++
- r.prs.RemoveAny(cc.NodeID)
- case pb.ConfChangeUpdateNode:
- default:
- panic("unexpected conf type")
- }
- }
+ r.prs.Config = cfg
+ r.prs.Progress = prs
r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
// Now that the configuration is updated, handle any side effects.
@@ -1479,12 +1436,10 @@
if r.state != StateLeader || len(cs.Nodes) == 0 {
return cs
}
- if removed > 0 {
+ if r.maybeCommit() {
// The quorum size may have been reduced (but not to zero), so see if
// any pending entries can be committed.
- if r.maybeCommit() {
- r.bcastAppend()
- }
+ r.bcastAppend()
}
// If the the leadTransferee was removed, abort the leadership transfer.
if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
diff --git a/vendor/go.etcd.io/etcd/raft/tracker/progress.go b/vendor/go.etcd.io/etcd/raft/tracker/progress.go
index a7f1ab7..697277b 100644
--- a/vendor/go.etcd.io/etcd/raft/tracker/progress.go
+++ b/vendor/go.etcd.io/etcd/raft/tracker/progress.go
@@ -16,6 +16,7 @@
import (
"fmt"
+ "sort"
"strings"
)
@@ -235,3 +236,22 @@
}
return buf.String()
}
+
+// ProgressMap is a map of *Progress.
+type ProgressMap map[uint64]*Progress
+
+// String prints the ProgressMap in sorted key order, one Progress per line.
+func (m ProgressMap) String() string {
+ ids := make([]uint64, 0, len(m))
+ for k := range m {
+ ids = append(ids, k)
+ }
+ sort.Slice(ids, func(i, j int) bool {
+ return ids[i] < ids[j]
+ })
+ var buf strings.Builder
+ for _, id := range ids {
+ fmt.Fprintf(&buf, "%d: %s\n", id, m[id])
+ }
+ return buf.String()
+}
diff --git a/vendor/go.etcd.io/etcd/raft/tracker/tracker.go b/vendor/go.etcd.io/etcd/raft/tracker/tracker.go
index 4b3396f..a2638f5 100644
--- a/vendor/go.etcd.io/etcd/raft/tracker/tracker.go
+++ b/vendor/go.etcd.io/etcd/raft/tracker/tracker.go
@@ -17,6 +17,7 @@
import (
"fmt"
"sort"
+ "strings"
"go.etcd.io/etcd/raft/quorum"
)
@@ -33,12 +34,11 @@
// simplifies the implementation since it allows peers to have clarity about
// its current role without taking into account joint consensus.
Learners map[uint64]struct{}
- // TODO(tbg): when we actually carry out joint consensus changes and turn a
- // voter into a learner, we cannot add the learner when entering the joint
- // state. This is because this would violate the invariant that the inter-
- // section of voters and learners is empty. For example, assume a Voter is
- // removed and immediately re-added as a learner (or in other words, it is
- // demoted).
+ // When we turn a voter into a learner during a joint consensus transition,
+ // we cannot add the learner directly when entering the joint state. This is
+ // because this would violate the invariant that the intersection of
+ // voters and learners is empty. For example, assume a Voter is removed and
+ // immediately re-added as a learner (or in other words, it is demoted):
//
// Initially, the configuration will be
//
@@ -51,7 +51,7 @@
// learners: {3}
//
// but this violates the invariant (3 is both voter and learner). Instead,
- // we have
+ // we get
//
// voters: {1 2} & {1 2 3}
// learners: {}
@@ -66,20 +66,40 @@
//
// Note that next_learners is not used while adding a learner that is not
// also a voter in the joint config. In this case, the learner is added
- // to Learners right away when entering the joint configuration, so that it
- // is caught up as soon as possible.
- //
- // NextLearners map[uint64]struct{}
+ // right away when entering the joint configuration, so that it is caught up
+ // as soon as possible.
+ LearnersNext map[uint64]struct{}
}
-func (c *Config) String() string {
- if len(c.Learners) == 0 {
- return fmt.Sprintf("voters=%s", c.Voters)
+func (c Config) String() string {
+ var buf strings.Builder
+ fmt.Fprintf(&buf, "voters=%s", c.Voters)
+ if c.Learners != nil {
+ fmt.Fprintf(&buf, " learners=%s", quorum.MajorityConfig(c.Learners).String())
}
- return fmt.Sprintf(
- "voters=%s learners=%s",
- c.Voters, quorum.MajorityConfig(c.Learners).String(),
- )
+ if c.LearnersNext != nil {
+ fmt.Fprintf(&buf, " learners_next=%s", quorum.MajorityConfig(c.LearnersNext).String())
+ }
+ return buf.String()
+}
+
+// Clone returns a copy of the Config that shares no memory with the original.
+func (c *Config) Clone() Config {
+ clone := func(m map[uint64]struct{}) map[uint64]struct{} {
+ if m == nil {
+ return nil
+ }
+ mm := make(map[uint64]struct{}, len(m))
+ for k := range m {
+ mm[k] = struct{}{}
+ }
+ return mm
+ }
+ return Config{
+ Voters: quorum.JointConfig{clone(c.Voters[0]), clone(c.Voters[1])},
+ Learners: clone(c.Learners),
+ LearnersNext: clone(c.LearnersNext),
+ }
}
// ProgressTracker tracks the currently active configuration and the information
@@ -88,7 +108,7 @@
type ProgressTracker struct {
Config
- Progress map[uint64]*Progress
+ Progress ProgressMap
Votes map[uint64]bool
@@ -102,11 +122,10 @@
Config: Config{
Voters: quorum.JointConfig{
quorum.MajorityConfig{},
- // TODO(tbg): this will be mostly empty, so make it a nil pointer
- // in the common case.
- quorum.MajorityConfig{},
+ nil, // only populated when used
},
- Learners: map[uint64]struct{}{},
+ Learners: nil, // only populated when used
+ LearnersNext: nil, // only populated when used
},
Votes: map[uint64]bool{},
Progress: map[uint64]*Progress{},
@@ -139,44 +158,6 @@
return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress)))
}
-// RemoveAny removes this peer, which *must* be tracked as a voter or learner,
-// from the tracker.
-func (p *ProgressTracker) RemoveAny(id uint64) {
- _, okPR := p.Progress[id]
- _, okV1 := p.Voters[0][id]
- _, okV2 := p.Voters[1][id]
- _, okL := p.Learners[id]
-
- okV := okV1 || okV2
-
- if !okPR {
- panic("attempting to remove unknown peer %x")
- } else if !okV && !okL {
- panic("attempting to remove unknown peer %x")
- } else if okV && okL {
- panic(fmt.Sprintf("peer %x is both voter and learner", id))
- }
-
- delete(p.Voters[0], id)
- delete(p.Voters[1], id)
- delete(p.Learners, id)
- delete(p.Progress, id)
-}
-
-// InitProgress initializes a new progress for the given node or learner. The
-// node may not exist yet in either form or a panic will ensue.
-func (p *ProgressTracker) InitProgress(id, match, next uint64, isLearner bool) {
- if pr := p.Progress[id]; pr != nil {
- panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
- }
- if !isLearner {
- p.Voters[0][id] = struct{}{}
- } else {
- p.Learners[id] = struct{}{}
- }
- p.Progress[id] = &Progress{Next: next, Match: match, Inflights: NewInflights(p.MaxInflight), IsLearner: isLearner}
-}
-
// Visit invokes the supplied closure for all tracked progresses.
func (p *ProgressTracker) Visit(f func(id uint64, pr *Progress)) {
for id, pr := range p.Progress {