Implement probe prototype
Change-Id: If30bd80701129d9fa892df0b1d37214a7b6c7a33
diff --git a/xos/synchronizer/__init__.py b/xos/synchronizer/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/xos/synchronizer/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/xos/synchronizer/config.yaml b/xos/synchronizer/config.yaml
new file mode 100644
index 0000000..4fd1440
--- /dev/null
+++ b/xos/synchronizer/config.yaml
@@ -0,0 +1,40 @@
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+name: cord-workflow-probe
+core_version: ">=2.2.1"
+required_models:
+ - RCORDSubscriber
+ - ONUDevice
+model_policies_dir: "/opt/xos/synchronizers/cord-workflow-probe/model_policies"
+models_dir: "/opt/xos/synchronizers/cord-workflow-probe/models"
+event_steps_dir: "/opt/xos/synchronizers/cord-workflow-probe/event_steps"
+logging:
+ version: 1
+ handlers:
+ console:
+ class: logging.StreamHandler
+ file:
+ class: logging.handlers.RotatingFileHandler
+ filename: /var/log/xos.log
+ maxBytes: 10485760
+ backupCount: 5
+ loggers:
+ 'multistructlog':
+ handlers:
+ - console
+ - file
+ level: DEBUG
\ No newline at end of file
diff --git a/xos/synchronizer/cord-workflow-probe-synchronizer.py b/xos/synchronizer/cord-workflow-probe-synchronizer.py
new file mode 100644
index 0000000..0b90e04
--- /dev/null
+++ b/xos/synchronizer/cord-workflow-probe-synchronizer.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This imports and runs ../../xos-observer.py
+
+import os
+from xossynchronizer import Synchronizer
+from xosconfig import Config
+
+
+base_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/config.yaml')
+mounted_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/mounted_config.yaml')
+
+if os.path.isfile(mounted_config_file):
+ Config.init(base_config_file, 'synchronizer-config-schema.yaml', mounted_config_file)
+else:
+ Config.init(base_config_file, 'synchronizer-config-schema.yaml')
+
+Synchronizer().run()
diff --git a/xos/synchronizer/event_steps/__init__.py b/xos/synchronizer/event_steps/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/xos/synchronizer/event_steps/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/xos/synchronizer/event_steps/cord_workflow_event_probe.py b/xos/synchronizer/event_steps/cord_workflow_event_probe.py
new file mode 100644
index 0000000..7eecf24
--- /dev/null
+++ b/xos/synchronizer/event_steps/cord_workflow_event_probe.py
@@ -0,0 +1,72 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from xossynchronizer.event_steps.eventstep import EventStep
+from cord_workflow_controller_client.probe import Probe
+
+
+class CORDWorkflowEventProbe(EventStep):
+ # topics = ["onu.events", "dhcp.events", "authentication.events"]
+ topics = ['*.events']
+ technology = 'kafka'
+ controller_url = 'http://controller:3030'
+ retry_conn_max = 3
+
+ def __init__(self, *args, **kwargs):
+ super(CORDWorkflowEventProbe, self).__init__(*args, **kwargs)
+
+ self.connected = False
+ self.retry = 0
+ self.connect()
+
+ def connect(self):
+ if not self.connected:
+ if self.retry > self.retry_conn_max:
+ self.log.info(
+ 'Could not connect to Workflow Controller (%s)...' %
+ self.controller_url
+ )
+ self.probe = None
+ self.connected = False
+ else:
+ try:
+ self.log.info(
+ 'Connecting to Workflow Controller (%s)...' %
+ self.controller_url
+ )
+
+ self.probe = Probe(logger=self.log)
+ self.probe.connect(self.controller_url)
+ self.connected = True
+ self.retry = 0
+ except Exception:
+ self.probe = None
+ self.connected = False
+ self.retry += 1
+
+ def process_event(self, event):
+ if not self.connected:
+ self.connect()
+
+ if self.connected:
+ topic = event.topic
+ # event is in json format
+ message = json.loads(event.value)
+
+ self.log.info('Emitting an event (%s - %s)...' % (topic, message))
+ self.probe.emit_event(topic, message)
+ self.log.info('Emitted an event (%s - %s)...' % (topic, message))
+ else:
+ self.log.info('Skip emitting an event (%s - %s)...' % (topic, message))
diff --git a/xos/synchronizer/migrations/__init__.py b/xos/synchronizer/migrations/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/xos/synchronizer/migrations/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/xos/synchronizer/model_policies/__init__.py b/xos/synchronizer/model_policies/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/xos/synchronizer/model_policies/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/xos/synchronizer/model_policies/cord_workflow_model_event_probe.py b/xos/synchronizer/model_policies/cord_workflow_model_event_probe.py
new file mode 100644
index 0000000..06665de
--- /dev/null
+++ b/xos/synchronizer/model_policies/cord_workflow_model_event_probe.py
@@ -0,0 +1,96 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xossynchronizer.model_policies.policy import Policy
+from cord_workflow_controller_client.probe import Probe
+import os
+import sys
+
+sync_path = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
+sys.path.append(sync_path)
+
+controller_url = 'http://controller:3030'
+probe = None
+retry_conn_max = 3
+connected = False
+retry = 0
+
+
+def connect():
+ if not connected:
+ global probe, connected, retry
+ if retry > retry_conn_max:
+ probe = None
+ connected = False
+ else:
+ try:
+ probe = Probe()
+ probe.connect(controller_url)
+ connected = True
+ retry = 0
+ except Exception:
+ probe = None
+ connected = False
+ retry += 1
+
+
+def emit_helper(instance, event_type):
+ topic = 'datamodel.%s' % instance.model_name
+ message = {
+ 'event_type': event_type
+ }
+
+ if not connected:
+ connect()
+
+ if connected:
+ if 'log' in instance:
+ instance.log.info('Emitting an event (%s - %s)...' % (topic, message))
+
+ if probe:
+ probe.emit_event(topic, message)
+
+ if 'log' in instance:
+ instance.log.info('Emitted an event (%s - %s)...' % (topic, message))
+ else:
+ if 'log' in instance:
+ instance.log.info('Skip emitting an event (%s - %s)...' % (topic, message))
+
+
+class CORDWorkflowModelEventProbe_ATTSI(Policy):
+ # TODO: NEED TO ALLOW MULTI-SUBSCRIPTION OF MODEL UPDATE EVENTS AT A LOWER LEVEL
+ # TO ELIMINATE CREATION OF A CLASS PER MODEL
+ model_name = "AttWorkflowDriverServiceInstance"
+
+ def handle_create(self, instance):
+ emit_helper(self, 'create')
+
+ def handle_update(self, instance):
+ emit_helper(self, 'update')
+
+ def handle_delete(self, instance):
+ emit_helper(self, 'delete')
+
+
+class CORDWorkflowModelEventProbe_ATTWL(Policy):
+ model_name = "AttWorkflowDriverWhiteListEntry"
+
+ def handle_create(self, instance):
+ emit_helper(self, 'create')
+
+ def handle_update(self, instance):
+ emit_helper(self, 'update')
+
+ def handle_delete(self, instance):
+ emit_helper(self, 'delete')
diff --git a/xos/synchronizer/test_config.yaml b/xos/synchronizer/test_config.yaml
new file mode 100644
index 0000000..c6263fe
--- /dev/null
+++ b/xos/synchronizer/test_config.yaml
@@ -0,0 +1,30 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+name: test-cord-workflow-probe
+accessor:
+ username: xosadmin@opencord.org
+ password: "sample"
+ kind: "testframework"
+logging:
+ version: 1
+ handlers:
+ console:
+ class: logging.StreamHandler
+ loggers:
+ 'multistructlog':
+ handlers:
+ - console
+# level: DEBUG
\ No newline at end of file
diff --git a/xos/unittest.cfg b/xos/unittest.cfg
new file mode 100644
index 0000000..4c43fef
--- /dev/null
+++ b/xos/unittest.cfg
@@ -0,0 +1,12 @@
+[unittest]
+plugins=nose2.plugins.junitxml
+code-directories=synchronizer
+ model_policies
+ steps
+ pull_steps
+ event_steps
+
+[coverage]
+always-on = True
+coverage = synchronizer
+coverage-report = term, html, xml
\ No newline at end of file