[VOL-4646] Extend hardening test by check kafka event

Change-Id: I71917a751a5d305cde7bbd099ce28906cbe6e65f
diff --git a/tests/openonu-go-adapter/Voltha_ONUNegativeStateTests.robot b/tests/openonu-go-adapter/Voltha_ONUNegativeStateTests.robot
index 1ffe71e..99d5a62 100755
--- a/tests/openonu-go-adapter/Voltha_ONUNegativeStateTests.robot
+++ b/tests/openonu-go-adapter/Voltha_ONUNegativeStateTests.robot
@@ -33,14 +33,17 @@
 Resource          ../../libraries/bbsim.robot
 Resource          ../../variables/variables.robot
 
+Library           kafka_robot.KafkaClient    log_level=DEBUG    WITH NAME    kafka
+Library           grpc_robot.VolthaTools     WITH NAME    volthatools
+
 *** Variables ***
-${NAMESPACE}      voltha
-${INFRA_NAMESPACE}      default
-${timeout}        300s
-${of_id}          0
-${logical_id}     0
-${has_dataplane}    True
-${external_libs}    True
+${NAMESPACE}          voltha
+${INFRA_NAMESPACE}    default
+${timeout}            300s
+${of_id}              0
+${logical_id}         0
+${has_dataplane}      True
+${external_libs}      True
 ${teardown_device}    True
 ${scripts}        ../../scripts
 # Per-test logging on failure is turned off by default; set this variable to enable
@@ -51,6 +54,18 @@
 # used tech profile, can be passed via the command line too, valid values: default (=1T1GEM), 1T4GEM, 1T8GEM
 # example: -v techprofile:1T4GEM
 ${techprofile}    default
+# when voltha is running in k8s port forwarding is needed
+# example: -v PORT_FORWARDING:False
+${PORT_FORWARDING}    True
+# kafka ip e.g. ip of master host where k8s is running
+# example: -v KAFKA_IP:10.0.2.15
+${KAFKA_IP}    127.0.0.1
+# kafka port: port of kafka nodeport
+# example: -v KAFKA_PORT:30201
+${KAFKA_PORT}    30201
+# kafka service port: service port of kafka nodeport
+# example: -v KAFKA_SVC_PORT:9094
+${KAFKA_SVC_PORT}    9094
 # flag debugmode is used, if true timeout calculation various, can be passed via the command line too
 # example: -v debugmode:True
 ${debugmode}    False
@@ -79,6 +94,7 @@
     ...                Timeout has to set at least to 300s (or more)
     [Tags]    NegativeStateTestOnuGo
     [Setup]    Run Keywords    Start Logging    ONUNegativeStateTest
+    ...    AND    kafka.Records Clear
     ...    AND    Setup
     # Suite Variable will be overwritten by Teardown of Current State Test All Onus
     Set Suite Variable    ${StateTestAllONUs}    True
@@ -90,6 +106,10 @@
         Current State Test All Onus    starting-openomci    timeout=1x
         Exit For Loop If    not ${StateTestAllONUs}
     END
+    ${list_onu_device_id}    Create List
+    Build ONU Device Id List    ${list_onu_device_id}
+    Run Keyword If    ${print2console}    Log    Check for device events that indicate a failed OMCI communication.   console=yes
+    Wait Until Keyword Succeeds    ${timeout}    5s    Validate Failed OMCI Communication All ONUs    ${list_onu_device_id}
     [Teardown]    Run Keywords   Printout ONU Serial Number and Device Id    print2console=${print2console}
     ...    AND    Run Keyword If    ${logging}    Collect Logs
     ...    AND    Stop Logging    ONUStateTest
@@ -116,6 +136,19 @@
     Delete MIB Template Data    ${INFRA_NAMESPACE}
     # delete etcd onu data
     Delete ONU Go Adapter ETCD Data    namespace=${INFRA_NAMESPACE}    validate=True
+    # set ${kafka} depending on environment in case of port-forward is needed
+    ${rc}    ${kafka}=    Run Keyword If    ${PORT_FORWARDING}    Run and Return Rc and Output
+    ...    kubectl get svc -n ${INFRA_NAMESPACE} | grep kafka-0-external | awk '{print $1}'
+    Run Keyword If    ${PORT_FORWARDING}    Should Not Be Empty    ${kafka}    Service kafka-0-external not found
+    # start port forwarding if needed (when voltha runs in k8s)
+    ${portFwdHandle} =    Run Keyword If    ${PORT_FORWARDING}    Start Process
+    ...    kubectl port-forward --address 0.0.0.0 --namespace default svc/${kafka} ${KAFKA_PORT}:${KAFKA_SVC_PORT} &
+    ...    shell=true
+    Set Suite Variable   ${portFwdHandle}
+    Sleep    5s
+    # open connection to read kafka bus
+    Wait Until Keyword Succeeds     3x    5s
+    ...    kafka.Connection Open    ${KAFKA_IP}    ${KAFKA_PORT}    voltha.events    timestamp_from=0
     Run Keyword If    ${logging}    Collect Logs
     Stop Logging Setup or Teardown    Setup-${SUITE NAME}
 
@@ -128,6 +161,10 @@
     Run Keyword If    ${pausebeforecleanup}    Pause Execution    Press OK to continue with clean up!
     Run Keyword If    ${pausebeforecleanup}    Log    Teardown will be continued...    console=yes
     Run Keyword If    ${teardown_device}    Delete All Devices and Verify
+    # close connection to kafka
+    kafka.Connection Close
+    # stop port forwarding if started
+    Run Keyword If    ${PORT_FORWARDING}    Terminate Process    ${portFwdHandle}    kill=true
     Wait Until Keyword Succeeds    ${timeout}    1s    Validate Onu Data In Etcd    ${INFRA_NAMESPACE}    0    ${kvstoreprefix}
     ...    without_pm_data=False
     Wait for Ports in ONOS for all OLTs      ${ONOS_SSH_IP}    ${ONOS_SSH_PORT}  0   BBSM    ${timeout}
@@ -135,3 +172,20 @@
     Stop Logging Setup or Teardown   Teardown-${SUITE NAME}
     Close All ONOS SSH Connections
     Remove Tech Profile    ${INFRA_NAMESPACE}
+
+Validate Failed OMCI Communication All ONUs
+    [Documentation]    Validates Failed OMCI communication Events per ONU
+    [Arguments]    ${list_onu_device_id}
+    ${Kafka_Records}=    kafka.Records Get    voltha.events
+    ${RecordsLength}=    Get Length    ${Kafka_Records}
+    FOR    ${Index}    IN RANGE    0    ${RecordsLength}
+        ${metric}=    Set Variable    ${Kafka_Records[${Index}]}
+        ${message}=   Get From Dictionary  ${metric}  message
+        ${event}=     volthatools.Events Decode Event   ${message}    return_default=true
+        Continue For Loop If    not 'device_event' in ${event}
+        ${event_name}=    Get From Dictionary  ${event['device_event']}    device_event_name
+        Continue For Loop If    "${event_name}" != "ONU_OMCI_COMMUNICATION_FAILURE_CONFIG"
+        ${resource_id}=    Get From Dictionary  ${event['device_event']}    resource_id
+        Remove Values From List    ${list_onu_device_id}    ${resource_id}
+    END
+    Should Be Empty    ${list_onu_device_id}    Missing MIB Audits for ONUs ${list_onu_device_id}!