blob: 92966a78c9b6f7727bf9ba60b41dcb33753cdee2 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Matteo Scandolod525ae32020-04-02 17:27:29 -070016package kafka
khenaidooab1f7bd2019-11-14 14:00:27 -050017
18import (
Rohan Agrawal31f21802020-06-12 05:38:46 +000019 "context"
khenaidooab1f7bd2019-11-14 14:00:27 -050020 "fmt"
Himani Chawla606a4f02021-03-23 19:45:58 +053021 "github.com/golang/protobuf/ptypes"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 "sync"
Scott Baker504b4802020-04-17 10:12:20 -070023 "time"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080024
Himani Chawla606a4f02021-03-23 19:45:58 +053025 "github.com/golang/protobuf/proto"
yasin sapli5458a1c2021-06-14 22:24:38 +000026 "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
27 "github.com/opencord/voltha-lib-go/v5/pkg/log"
Maninderdfadc982020-10-28 14:04:33 +053028 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
Himani Chawla606a4f02021-03-23 19:45:58 +053029 "github.com/opencord/voltha-protos/v4/go/voltha"
khenaidooab1f7bd2019-11-14 14:00:27 -050030 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/status"
khenaidooab1f7bd2019-11-14 14:00:27 -050032)
33
npujar467fe752020-01-16 20:17:45 +053034// static check to ensure KafkaClient implements kafka.Client
35var _ kafka.Client = &KafkaClient{}
36
khenaidooab1f7bd2019-11-14 14:00:27 -050037type KafkaClient struct {
38 topicsChannelMap map[string][]chan *ic.InterContainerMessage
39 lock sync.RWMutex
40}
41
42func NewKafkaClient() *KafkaClient {
43 return &KafkaClient{
44 topicsChannelMap: make(map[string][]chan *ic.InterContainerMessage),
45 lock: sync.RWMutex{},
46 }
47}
48
Rohan Agrawal31f21802020-06-12 05:38:46 +000049func (kc *KafkaClient) Start(ctx context.Context) error {
50 logger.Debug(ctx, "kafka-client-started")
khenaidooab1f7bd2019-11-14 14:00:27 -050051 return nil
52}
53
Rohan Agrawal31f21802020-06-12 05:38:46 +000054func (kc *KafkaClient) Stop(ctx context.Context) {
khenaidooab1f7bd2019-11-14 14:00:27 -050055 kc.lock.Lock()
56 defer kc.lock.Unlock()
57 for topic, chnls := range kc.topicsChannelMap {
58 for _, c := range chnls {
59 close(c)
60 }
61 delete(kc.topicsChannelMap, topic)
62 }
Rohan Agrawal31f21802020-06-12 05:38:46 +000063 logger.Debug(ctx, "kafka-client-stopped")
khenaidooab1f7bd2019-11-14 14:00:27 -050064}
65
Rohan Agrawal31f21802020-06-12 05:38:46 +000066func (kc *KafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
67 logger.Debugw(ctx, "CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
khenaidooab1f7bd2019-11-14 14:00:27 -050068 kc.lock.Lock()
69 defer kc.lock.Unlock()
70 if _, ok := kc.topicsChannelMap[topic.Name]; ok {
71 return fmt.Errorf("Topic %s already exist", topic.Name)
72 }
73 ch := make(chan *ic.InterContainerMessage)
74 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
75 return nil
76}
77
Rohan Agrawal31f21802020-06-12 05:38:46 +000078func (kc *KafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
79 logger.Debugw(ctx, "DeleteTopic", log.Fields{"topic": topic.Name})
khenaidooab1f7bd2019-11-14 14:00:27 -050080 kc.lock.Lock()
81 defer kc.lock.Unlock()
82 delete(kc.topicsChannelMap, topic.Name)
83 return nil
84}
85
Rohan Agrawal31f21802020-06-12 05:38:46 +000086func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
87 logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
khenaidooab1f7bd2019-11-14 14:00:27 -050088 kc.lock.Lock()
89 defer kc.lock.Unlock()
90 ch := make(chan *ic.InterContainerMessage)
91 kc.topicsChannelMap[topic.Name] = append(kc.topicsChannelMap[topic.Name], ch)
92 return ch, nil
93}
94
95func removeChannel(s []chan *ic.InterContainerMessage, i int) []chan *ic.InterContainerMessage {
96 s[i] = s[len(s)-1]
97 return s[:len(s)-1]
98}
99
Rohan Agrawal31f21802020-06-12 05:38:46 +0000100func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
101 logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
khenaidooab1f7bd2019-11-14 14:00:27 -0500102 kc.lock.Lock()
103 defer kc.lock.Unlock()
104 if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
105 idx := -1
106 for i, c := range chnls {
107 if c == ch {
108 close(c)
109 idx = i
110 }
111 }
112 if idx >= 0 {
113 kc.topicsChannelMap[topic.Name] = removeChannel(kc.topicsChannelMap[topic.Name], idx)
114 }
115 }
116 return nil
117}
118
Rohan Agrawal31f21802020-06-12 05:38:46 +0000119func (kc *KafkaClient) SubscribeForMetadata(ctx context.Context, _ func(fromTopic string, timestamp time.Time)) {
120 logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
npujar467fe752020-01-16 20:17:45 +0530121}
122
Himani Chawla606a4f02021-03-23 19:45:58 +0530123func toIntercontainerMessage(event *voltha.Event) *ic.InterContainerMessage {
124 msg := &ic.InterContainerMessage{
125 Header: &ic.Header{
126 Id: event.Header.Id,
127 Type: ic.MessageType_REQUEST,
128 Timestamp: event.Header.RaisedTs,
129 },
130 }
131 // Marshal event
132 if eventBody, err := ptypes.MarshalAny(event); err == nil {
133 msg.Body = eventBody
134 }
135 return msg
136}
137
Rohan Agrawal31f21802020-06-12 05:38:46 +0000138func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
Himani Chawla606a4f02021-03-23 19:45:58 +0530139 // Assert message is a proto message
140 // ascertain the value interface type is a proto.Message
141 if _, ok := msg.(proto.Message); !ok {
142 logger.Warnw(ctx, "message-not-a-proto-message", log.Fields{"msg": msg})
143 return status.Error(codes.InvalidArgument, "msg-not-a-proto-msg")
144 }
khenaidooab1f7bd2019-11-14 14:00:27 -0500145 req, ok := msg.(*ic.InterContainerMessage)
146 if !ok {
Himani Chawla606a4f02021-03-23 19:45:58 +0530147 event, ok := msg.(*voltha.Event) //This is required as event message will be of type voltha.Event
148 if !ok {
149 return status.Error(codes.InvalidArgument, "unexpected-message-type")
150 }
151 req = toIntercontainerMessage(event)
khenaidooab1f7bd2019-11-14 14:00:27 -0500152 }
153 if req == nil {
154 return status.Error(codes.InvalidArgument, "msg-nil")
155 }
156 kc.lock.RLock()
157 defer kc.lock.RUnlock()
158 for _, ch := range kc.topicsChannelMap[topic.Name] {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000159 logger.Debugw(ctx, "Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
khenaidooab1f7bd2019-11-14 14:00:27 -0500160 ch <- req
161 }
162 return nil
163}
164
Rohan Agrawal31f21802020-06-12 05:38:46 +0000165func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
khenaidooab1f7bd2019-11-14 14:00:27 -0500166 return status.Error(codes.Unimplemented, "SendLiveness")
167}
168
Rohan Agrawal31f21802020-06-12 05:38:46 +0000169func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
170 logger.Debug(ctx, "EnableLivenessChannel - unimplemented")
khenaidooab1f7bd2019-11-14 14:00:27 -0500171 return nil
172}
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800173
Rohan Agrawal31f21802020-06-12 05:38:46 +0000174func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
175 logger.Debug(ctx, "EnableHealthinessChannel - unimplemented")
serkant.uluderya2ae470f2020-01-21 11:13:09 -0800176 return nil
177}