blob: a833d589723a53cf1dbc48ce1db6b1a6b7a01526 [file] [log] [blame]
Stephane Barbarie4a2564d2018-07-26 11:02:58 -04001package model
2
3import (
4 "encoding/json"
5 "fmt"
6 "github.com/opencord/voltha/protos/go/voltha"
7)
8
9type EventBus struct {
10 client *EventBusClient
11 topic string
12}
13
14var (
15 IGNORED_CALLBACKS = map[CallbackType]struct{}{
16 PRE_ADD: {},
17 GET: {},
18 POST_LISTCHANGE: {},
19 PRE_REMOVE: {},
20 PRE_UPDATE: {},
21 }
22)
23
24func NewEventBus() *EventBus {
25 bus := &EventBus{
26 client: NewEventBusClient(),
27 topic: "model-change-events",
28 }
29 return bus
30}
31
32func (bus *EventBus) Advertise(eventType CallbackType, data interface{}, hash string) {
33 if _, ok := IGNORED_CALLBACKS[eventType]; ok {
34 fmt.Printf("ignoring event - type:%s, data:%+v\n", eventType, data)
35 }
36 var kind voltha.ConfigEventType_ConfigEventType
37 switch eventType {
38 case POST_ADD:
39 kind = voltha.ConfigEventType_add
40 case POST_REMOVE:
41 kind = voltha.ConfigEventType_remove
42 default:
43 kind = voltha.ConfigEventType_update
44 }
45
46 var msg []byte
47 var err error
48 if IsProtoMessage(data) {
49 if msg, err = json.Marshal(data); err != nil {
50 fmt.Errorf("problem marshalling data: %+v, err:%s\n", data, err.Error())
51 }
52 } else {
53 msg = data.([]byte)
54 }
55
56 event := voltha.ConfigEvent{
57 Type: kind,
58 Hash: hash,
59 Data: string(msg),
60 }
61
62 bus.client.Publish(bus.topic, event)
63}