blob: 7eecf24c29b35226da69098d4f6fdd89ed5c4bb1 [file] [log] [blame]
Illyoung Choi8b097c92019-08-01 12:38:06 -07001# Copyright 2019-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import json
16from xossynchronizer.event_steps.eventstep import EventStep
17from cord_workflow_controller_client.probe import Probe
18
19
20class CORDWorkflowEventProbe(EventStep):
21 # topics = ["onu.events", "dhcp.events", "authentication.events"]
22 topics = ['*.events']
23 technology = 'kafka'
24 controller_url = 'http://controller:3030'
25 retry_conn_max = 3
26
27 def __init__(self, *args, **kwargs):
28 super(CORDWorkflowEventProbe, self).__init__(*args, **kwargs)
29
30 self.connected = False
31 self.retry = 0
32 self.connect()
33
34 def connect(self):
35 if not self.connected:
36 if self.retry > self.retry_conn_max:
37 self.log.info(
38 'Could not connect to Workflow Controller (%s)...' %
39 self.controller_url
40 )
41 self.probe = None
42 self.connected = False
43 else:
44 try:
45 self.log.info(
46 'Connecting to Workflow Controller (%s)...' %
47 self.controller_url
48 )
49
50 self.probe = Probe(logger=self.log)
51 self.probe.connect(self.controller_url)
52 self.connected = True
53 self.retry = 0
54 except Exception:
55 self.probe = None
56 self.connected = False
57 self.retry += 1
58
59 def process_event(self, event):
60 if not self.connected:
61 self.connect()
62
63 if self.connected:
64 topic = event.topic
65 # event is in json format
66 message = json.loads(event.value)
67
68 self.log.info('Emitting an event (%s - %s)...' % (topic, message))
69 self.probe.emit_event(topic, message)
70 self.log.info('Emitted an event (%s - %s)...' % (topic, message))
71 else:
72 self.log.info('Skip emitting an event (%s - %s)...' % (topic, message))