CORD-3077 Document EventStep and Kafka integration
Change-Id: Ie48bf0242e7a1d782698a497e8a35226aabfc56a
diff --git a/docs/dev/synchronizers.md b/docs/dev/synchronizers.md
index 662babd..e8bf768 100644
--- a/docs/dev/synchronizers.md
+++ b/docs/dev/synchronizers.md
@@ -203,3 +203,80 @@
o.no_sync = True # this is required to prevent the synchronizer to be invoked and start a loop
o.save()
```
+
+### Event Steps
+
+Event Steps are similar to pull steps in that they are often used to implement a flow of information from the environment into the data model. However, rather than using polling, event steps rely on externally generated events delivered via an event bus, such as Kafka.
+
+> NOTE: You'll need to add this folder in your synchronizer configuration file
+> as:
+>
+> ```yaml
+> event_steps_dir: "/opt/xos/synchronizers/<synchronizer_name>/event_steps"
+> ```
+>
+> You'll also need to make sure the event bus endpoint is specified in the
+> synchronizer config file. For example:
+>
+> ```yaml
+> event_bus:
+> endpoint: cord-kafka-kafka
+> kind: kafka
+>```
+
+An event step inherits from the `EventStep` class:
+
+```python
+
+import json
+from synchronizers.new_base.eventstep import EventStep
+from synchronizers.new_base.modelaccessor import MyModel
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get('logging'))
+
+class MyModelEventStep(EventStep):
+ technology = "kafka"
+ topics = ["MyTopic"]
+
+ def __init__(self, *args, **kwargs):
+ super(MyEventStep, self).__init__(*args, **kwargs)
+```
+
+Two important class members that are defined in each event step are `technology` and `topics`. `technology` tells what type of event bus to use. There's currently only one bus interface implemented by the synchronizer framework, and that is `kafka`. The `topics` member is a list of topics that will be listened on for events. The precise meaning of `topics` is left to the particular event bus technology that is in use.
+
+Service-specific logic is implemented by overriding the `process_event()` method:
+
+```python
+
+ def process_event(self, event):
+ value = json.loads(event.value)
+ first_name = value["first_name"]
+ last_name = value["last_name"]
+
+ # See if the object already exists
+ objs = MyModel.filter(first_name=first_name, last_name=last_name)
+ if objs:
+ return
+
+ # Create a new object
+ obj = MyModel()
+ obj.first_name = first_name
+ obj.last_name = last_name
+ obj.save(always_update_timestamp = True)
+```
+
+In this example we've made the assumption that the value of an event is a json-encoded dictionary containing the keys `first_name` and `last_name`. The event step in this case checks to see if an object with those fields already exists, and if not then it creates the object.
+
+In this example, we've differed from the Pull Step example in that we omitted `no_sync=True` and we added `always_update_timestamp = True` to the `save()` call. This has the effect of causing the synchronizer to excute any sync steps that might exist for `MyModel`. Whether or not you want sync_steps to run is an implementation decision and depends upon the design of your synchronizer.
+
+Sending an event to Kafka can be done using a variety of Kafka clients for various languages, Kafka command-line tools, etc. A python example is as follows:
+
+```python
+import json
+from kafka import KafkaProducer
+producer = KafkaProducer(bootstrap_servers="cord-kafka-kafka")
+producer.send("MyTopic", json.dumps({"first_name": "John", "last_name": "Doe"}))
+producer.flush()
+```
\ No newline at end of file