blob: 10c692aaf6ec88cb8b37ec2f40492919f7feb900 [file] [log] [blame]
khenaidoo43c82122018-11-22 18:38:28 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "errors"
20 "fmt"
khenaidoo43c82122018-11-22 18:38:28 -050021 scc "github.com/bsm/sarama-cluster"
22 "github.com/golang/protobuf/proto"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050023 "github.com/google/uuid"
khenaidoo43c82122018-11-22 18:38:28 -050024 "github.com/opencord/voltha-go/common/log"
khenaidoo79232702018-12-04 11:00:41 -050025 ic "github.com/opencord/voltha-go/protos/inter_container"
khenaidoo4c1a5bf2018-11-29 15:53:42 -050026 "gopkg.in/Shopify/sarama.v1"
27 "strings"
khenaidoo43c82122018-11-22 18:38:28 -050028 "sync"
29 "time"
30)
31
khenaidoo4c1a5bf2018-11-29 15:53:42 -050032func init() {
khenaidooca301322019-01-09 23:06:32 -050033 log.AddPackage(log.JSON, log.DebugLevel, nil)
khenaidoo4c1a5bf2018-11-29 15:53:42 -050034}
35
36type returnErrorFunction func() error
37
38// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
39// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
40//consumer or a group consumer
khenaidoo43c82122018-11-22 18:38:28 -050041type consumerChannels struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050042 consumers []interface{}
khenaidoo79232702018-12-04 11:00:41 -050043 channels []chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -050044}
45
46// SaramaClient represents the messaging proxy
47type SaramaClient struct {
khenaidoo4c1a5bf2018-11-29 15:53:42 -050048 cAdmin sarama.ClusterAdmin
khenaidoo43c82122018-11-22 18:38:28 -050049 client sarama.Client
50 KafkaHost string
51 KafkaPort int
52 producer sarama.AsyncProducer
53 consumer sarama.Consumer
khenaidooca301322019-01-09 23:06:32 -050054 groupConsumers map[string]*scc.Consumer
55 consumerGroupPrefix string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050056 consumerType int
khenaidooca301322019-01-09 23:06:32 -050057 consumerGroupName string
khenaidoo4c1a5bf2018-11-29 15:53:42 -050058 producerFlushFrequency int
59 producerFlushMessages int
60 producerFlushMaxmessages int
61 producerRetryMax int
62 producerRetryBackOff time.Duration
63 producerReturnSuccess bool
64 producerReturnErrors bool
65 consumerMaxwait int
66 maxProcessingTime int
67 numPartitions int
68 numReplicas int
69 autoCreateTopic bool
khenaidoo43c82122018-11-22 18:38:28 -050070 doneCh chan int
71 topicToConsumerChannelMap map[string]*consumerChannels
72 lockTopicToConsumerChannelMap sync.RWMutex
khenaidood2b6df92018-12-13 16:37:20 -050073 topicLockMap map[string]*sync.RWMutex
74 lockOfTopicLockMap sync.RWMutex
khenaidoo43c82122018-11-22 18:38:28 -050075}
76
77type SaramaClientOption func(*SaramaClient)
78
79func Host(host string) SaramaClientOption {
80 return func(args *SaramaClient) {
81 args.KafkaHost = host
82 }
83}
84
85func Port(port int) SaramaClientOption {
86 return func(args *SaramaClient) {
87 args.KafkaPort = port
88 }
89}
90
khenaidooca301322019-01-09 23:06:32 -050091func ConsumerGroupPrefix(prefix string) SaramaClientOption {
92 return func(args *SaramaClient) {
93 args.consumerGroupPrefix = prefix
94 }
95}
96
97func ConsumerGroupName(name string) SaramaClientOption {
98 return func(args *SaramaClient) {
99 args.consumerGroupName = name
100 }
101}
102
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500103func ConsumerType(consumer int) SaramaClientOption {
104 return func(args *SaramaClient) {
105 args.consumerType = consumer
106 }
107}
108
109func ProducerFlushFrequency(frequency int) SaramaClientOption {
110 return func(args *SaramaClient) {
111 args.producerFlushFrequency = frequency
112 }
113}
114
115func ProducerFlushMessages(num int) SaramaClientOption {
116 return func(args *SaramaClient) {
117 args.producerFlushMessages = num
118 }
119}
120
121func ProducerFlushMaxMessages(num int) SaramaClientOption {
122 return func(args *SaramaClient) {
123 args.producerFlushMaxmessages = num
124 }
125}
126
khenaidoo90847922018-12-03 14:47:51 -0500127func ProducerMaxRetries(num int) SaramaClientOption {
128 return func(args *SaramaClient) {
129 args.producerRetryMax = num
130 }
131}
132
133func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
134 return func(args *SaramaClient) {
135 args.producerRetryBackOff = duration
136 }
137}
138
139func ProducerReturnOnErrors(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500140 return func(args *SaramaClient) {
141 args.producerReturnErrors = opt
142 }
143}
144
khenaidoo90847922018-12-03 14:47:51 -0500145func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500146 return func(args *SaramaClient) {
147 args.producerReturnSuccess = opt
148 }
149}
150
151func ConsumerMaxWait(wait int) SaramaClientOption {
152 return func(args *SaramaClient) {
153 args.consumerMaxwait = wait
154 }
155}
156
157func MaxProcessingTime(pTime int) SaramaClientOption {
158 return func(args *SaramaClient) {
159 args.maxProcessingTime = pTime
160 }
161}
162
163func NumPartitions(number int) SaramaClientOption {
164 return func(args *SaramaClient) {
165 args.numPartitions = number
166 }
167}
168
169func NumReplicas(number int) SaramaClientOption {
170 return func(args *SaramaClient) {
171 args.numReplicas = number
172 }
173}
174
175func AutoCreateTopic(opt bool) SaramaClientOption {
176 return func(args *SaramaClient) {
177 args.autoCreateTopic = opt
178 }
179}
180
khenaidoo43c82122018-11-22 18:38:28 -0500181func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
182 client := &SaramaClient{
183 KafkaHost: DefaultKafkaHost,
184 KafkaPort: DefaultKafkaPort,
185 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500186 client.consumerType = DefaultConsumerType
187 client.producerFlushFrequency = DefaultProducerFlushFrequency
188 client.producerFlushMessages = DefaultProducerFlushMessages
189 client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
190 client.producerReturnErrors = DefaultProducerReturnErrors
191 client.producerReturnSuccess = DefaultProducerReturnSuccess
192 client.producerRetryMax = DefaultProducerRetryMax
193 client.producerRetryBackOff = DefaultProducerRetryBackoff
194 client.consumerMaxwait = DefaultConsumerMaxwait
195 client.maxProcessingTime = DefaultMaxProcessingTime
196 client.numPartitions = DefaultNumberPartitions
197 client.numReplicas = DefaultNumberReplicas
198 client.autoCreateTopic = DefaultAutoCreateTopic
khenaidoo43c82122018-11-22 18:38:28 -0500199
200 for _, option := range opts {
201 option(client)
202 }
203
khenaidooca301322019-01-09 23:06:32 -0500204 client.groupConsumers = make(map[string]*scc.Consumer)
205
khenaidoo43c82122018-11-22 18:38:28 -0500206 client.lockTopicToConsumerChannelMap = sync.RWMutex{}
khenaidood2b6df92018-12-13 16:37:20 -0500207 client.topicLockMap = make(map[string]*sync.RWMutex)
208 client.lockOfTopicLockMap = sync.RWMutex{}
khenaidoo43c82122018-11-22 18:38:28 -0500209 return client
210}
211
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500212func (sc *SaramaClient) Start() error {
213 log.Info("Starting-kafka-sarama-client")
khenaidoo43c82122018-11-22 18:38:28 -0500214
215 // Create the Done channel
216 sc.doneCh = make(chan int, 1)
217
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500218 var err error
219
220 // Create the Cluster Admin
221 if err = sc.createClusterAdmin(); err != nil {
222 log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
223 return err
224 }
225
khenaidoo43c82122018-11-22 18:38:28 -0500226 // Create the Publisher
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500227 if err := sc.createPublisher(); err != nil {
khenaidoo43c82122018-11-22 18:38:28 -0500228 log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
229 return err
230 }
231
khenaidooca301322019-01-09 23:06:32 -0500232 if sc.consumerType == DefaultConsumerType {
233 // Create the master consumers
234 if err := sc.createConsumer(); err != nil {
235 log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
236 return err
237 }
khenaidoo43c82122018-11-22 18:38:28 -0500238 }
239
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500240 // Create the topic to consumers/channel map
khenaidoo43c82122018-11-22 18:38:28 -0500241 sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
242
khenaidooca301322019-01-09 23:06:32 -0500243 log.Info("kafka-sarama-client-started")
244
khenaidoo43c82122018-11-22 18:38:28 -0500245 return nil
246}
247
248func (sc *SaramaClient) Stop() {
249 log.Info("stopping-sarama-client")
250
251 //Send a message over the done channel to close all long running routines
252 sc.doneCh <- 1
253
khenaidoo43c82122018-11-22 18:38:28 -0500254 if sc.producer != nil {
255 if err := sc.producer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500256 log.Errorw("closing-producer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500257 }
258 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500259
khenaidoo43c82122018-11-22 18:38:28 -0500260 if sc.consumer != nil {
261 if err := sc.consumer.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500262 log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
khenaidoo43c82122018-11-22 18:38:28 -0500263 }
264 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500265
khenaidooca301322019-01-09 23:06:32 -0500266 for key, val := range sc.groupConsumers {
267 log.Debugw("closing-group-consumer", log.Fields{"topic": key})
268 if err := val.Close(); err != nil {
269 log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500270 }
271 }
272
273 if sc.cAdmin != nil {
274 if err := sc.cAdmin.Close(); err != nil {
khenaidooca301322019-01-09 23:06:32 -0500275 log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500276 }
277 }
278
279 //TODO: Clear the consumers map
khenaidooca301322019-01-09 23:06:32 -0500280 //sc.clearConsumerChannelMap()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500281
282 log.Info("sarama-client-stopped")
283}
284
khenaidooca301322019-01-09 23:06:32 -0500285//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
286// the invoking function must hold the lock
287func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500288 // Set the topic details
289 topicDetail := &sarama.TopicDetail{}
290 topicDetail.NumPartitions = int32(numPartition)
291 topicDetail.ReplicationFactor = int16(repFactor)
292 topicDetail.ConfigEntries = make(map[string]*string)
293 topicDetails := make(map[string]*sarama.TopicDetail)
294 topicDetails[topic.Name] = topicDetail
295
296 if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
297 if err == sarama.ErrTopicAlreadyExists {
298 // Not an error
299 log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
300 return nil
301 }
302 log.Errorw("create-topic-failure", log.Fields{"error": err})
303 return err
304 }
305 // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
306 // do so.
307 log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
308 return nil
309}
310
khenaidooca301322019-01-09 23:06:32 -0500311//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
312// ensure no two go routines are performing operations on the same topic
313func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
314 sc.lockTopic(topic)
315 defer sc.unLockTopic(topic)
316
317 return sc.createTopic(topic, numPartition, repFactor)
318}
319
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500320//DeleteTopic removes a topic from the kafka Broker
321func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
khenaidood2b6df92018-12-13 16:37:20 -0500322 sc.lockTopic(topic)
323 defer sc.unLockTopic(topic)
324
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500325 // Remove the topic from the broker
326 if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
327 if err == sarama.ErrUnknownTopicOrPartition {
328 // Not an error as does not exist
329 log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
330 return nil
331 }
332 log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
333 return err
334 }
335
336 // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
337 if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
338 log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
339 return err
340 }
341 return nil
342}
343
344// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
345// messages from that topic
khenaidooca301322019-01-09 23:06:32 -0500346func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
khenaidood2b6df92018-12-13 16:37:20 -0500347 sc.lockTopic(topic)
348 defer sc.unLockTopic(topic)
349
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500350 log.Debugw("subscribe", log.Fields{"topic": topic.Name})
351
352 // If a consumers already exist for that topic then resuse it
353 if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
354 log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
355 // Create a channel specific for that consumers and add it to the consumers channel map
khenaidoo79232702018-12-04 11:00:41 -0500356 ch := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500357 sc.addChannelToConsumerChannelMap(topic, ch)
358 return ch, nil
359 }
360
361 // Register for the topic and set it up
khenaidoo79232702018-12-04 11:00:41 -0500362 var consumerListeningChannel chan *ic.InterContainerMessage
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500363 var err error
364
365 // Use the consumerType option to figure out the type of consumer to launch
366 if sc.consumerType == PartitionConsumer {
367 if sc.autoCreateTopic {
khenaidooca301322019-01-09 23:06:32 -0500368 if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500369 log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
370 return nil, err
371 }
372 }
373 if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
374 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
375 return nil, err
376 }
377 } else if sc.consumerType == GroupCustomer {
378 // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
379 // does not consume from a precreated topic in some scenarios
khenaidooca301322019-01-09 23:06:32 -0500380 //if sc.autoCreateTopic {
381 // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
382 // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
383 // return nil, err
384 // }
385 //}
386 //groupId := sc.consumerGroupName
387 groupId := getGroupId(kvArgs...)
388 // Include the group prefix
389 if groupId != "" {
390 groupId = sc.consumerGroupPrefix + groupId
391 } else {
392 // Need to use a unique group Id per topic
393 groupId = sc.consumerGroupPrefix + topic.Name
394 }
395 if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
396 log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500397 return nil, err
398 }
khenaidooca301322019-01-09 23:06:32 -0500399
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500400 } else {
401 log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
402 return nil, errors.New("unknown-consumer-type")
403 }
404
405 return consumerListeningChannel, nil
406}
407
408//UnSubscribe unsubscribe a consumer from a given topic
khenaidoo79232702018-12-04 11:00:41 -0500409func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidood2b6df92018-12-13 16:37:20 -0500410 sc.lockTopic(topic)
411 defer sc.unLockTopic(topic)
412
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500413 log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
414 err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
415 return err
416}
417
418// send formats and sends the request onto the kafka messaging bus.
419func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
420
421 // Assert message is a proto message
422 var protoMsg proto.Message
423 var ok bool
424 // ascertain the value interface type is a proto.Message
425 if protoMsg, ok = msg.(proto.Message); !ok {
426 log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
427 return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
428 }
429
430 var marshalled []byte
431 var err error
432 // Create the Sarama producer message
433 if marshalled, err = proto.Marshal(protoMsg); err != nil {
434 log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
435 return err
436 }
437 key := ""
438 if len(keys) > 0 {
439 key = keys[0] // Only the first key is relevant
440 }
441 kafkaMsg := &sarama.ProducerMessage{
442 Topic: topic.Name,
443 Key: sarama.StringEncoder(key),
444 Value: sarama.ByteEncoder(marshalled),
445 }
446
447 // Send message to kafka
448 sc.producer.Input() <- kafkaMsg
khenaidoo90847922018-12-03 14:47:51 -0500449
450 // Wait for result
451 // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
452 select {
453 case ok := <-sc.producer.Successes():
khenaidoo79232702018-12-04 11:00:41 -0500454 log.Debugw("message-sent", log.Fields{"status": ok})
khenaidoo90847922018-12-03 14:47:51 -0500455 case notOk := <-sc.producer.Errors():
khenaidoo79232702018-12-04 11:00:41 -0500456 log.Debugw("error-sending", log.Fields{"status": notOk})
khenaidoo90847922018-12-03 14:47:51 -0500457 return notOk
458 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500459 return nil
460}
461
khenaidooca301322019-01-09 23:06:32 -0500462// getGroupId returns the group id from the key-value args.
463func getGroupId(kvArgs ...*KVArg) string {
464 for _, arg := range kvArgs {
465 if arg.Key == GroupIdKey {
466 return arg.Value.(string)
467 }
468 }
469 return ""
470}
471
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500472func (sc *SaramaClient) createClusterAdmin() error {
473 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
474 config := sarama.NewConfig()
475 config.Version = sarama.V1_0_0_0
476
477 // Create a cluster Admin
478 var cAdmin sarama.ClusterAdmin
479 var err error
480 if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
481 log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
482 return err
483 }
484 sc.cAdmin = cAdmin
485 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500486}
487
khenaidood2b6df92018-12-13 16:37:20 -0500488func (sc *SaramaClient) lockTopic(topic *Topic) {
489 sc.lockOfTopicLockMap.Lock()
490 if _, exist := sc.topicLockMap[topic.Name]; exist {
491 sc.lockOfTopicLockMap.Unlock()
492 sc.topicLockMap[topic.Name].Lock()
493 } else {
494 sc.topicLockMap[topic.Name] = &sync.RWMutex{}
495 sc.lockOfTopicLockMap.Unlock()
496 sc.topicLockMap[topic.Name].Lock()
497 }
498}
499
500func (sc *SaramaClient) unLockTopic(topic *Topic) {
501 sc.lockOfTopicLockMap.Lock()
502 defer sc.lockOfTopicLockMap.Unlock()
503 if _, exist := sc.topicLockMap[topic.Name]; exist {
504 sc.topicLockMap[topic.Name].Unlock()
505 }
506}
507
khenaidoo43c82122018-11-22 18:38:28 -0500508func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
509 sc.lockTopicToConsumerChannelMap.Lock()
510 defer sc.lockTopicToConsumerChannelMap.Unlock()
511 if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
512 sc.topicToConsumerChannelMap[id] = arg
513 }
514}
515
516func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
517 sc.lockTopicToConsumerChannelMap.Lock()
518 defer sc.lockTopicToConsumerChannelMap.Unlock()
519 if _, exist := sc.topicToConsumerChannelMap[id]; exist {
520 delete(sc.topicToConsumerChannelMap, id)
521 }
522}
523
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500524func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
khenaidoo43c82122018-11-22 18:38:28 -0500525 sc.lockTopicToConsumerChannelMap.Lock()
526 defer sc.lockTopicToConsumerChannelMap.Unlock()
527
528 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
529 return consumerCh
530 }
531 return nil
532}
533
khenaidoo79232702018-12-04 11:00:41 -0500534func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500535 sc.lockTopicToConsumerChannelMap.Lock()
536 defer sc.lockTopicToConsumerChannelMap.Unlock()
537 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
538 consumerCh.channels = append(consumerCh.channels, ch)
539 return
540 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500541 log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
542}
543
544//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
545func closeConsumers(consumers []interface{}) error {
546 var err error
547 for _, consumer := range consumers {
548 // Is it a partition consumers?
549 if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
550 if errTemp := partionConsumer.Close(); errTemp != nil {
551 log.Debugw("partition!!!", log.Fields{"err": errTemp})
552 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
553 // This can occur on race condition
554 err = nil
555 } else {
556 err = errTemp
557 }
558 }
559 } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
560 if errTemp := groupConsumer.Close(); errTemp != nil {
561 if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
562 // This can occur on race condition
563 err = nil
564 } else {
565 err = errTemp
566 }
567 }
568 }
569 }
570 return err
khenaidoo43c82122018-11-22 18:38:28 -0500571}
572
khenaidoo79232702018-12-04 11:00:41 -0500573func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
khenaidoo43c82122018-11-22 18:38:28 -0500574 sc.lockTopicToConsumerChannelMap.Lock()
575 defer sc.lockTopicToConsumerChannelMap.Unlock()
576 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
577 // Channel will be closed in the removeChannel method
578 consumerCh.channels = removeChannel(consumerCh.channels, ch)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500579 // If there are no more channels then we can close the consumers itself
khenaidoo43c82122018-11-22 18:38:28 -0500580 if len(consumerCh.channels) == 0 {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500581 log.Debugw("closing-consumers", log.Fields{"topic": topic})
582 err := closeConsumers(consumerCh.consumers)
583 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500584 delete(sc.topicToConsumerChannelMap, topic.Name)
585 return err
586 }
587 return nil
588 }
589 log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
590 return errors.New("topic-does-not-exist")
591}
592
593func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
594 sc.lockTopicToConsumerChannelMap.Lock()
595 defer sc.lockTopicToConsumerChannelMap.Unlock()
596 if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
597 for _, ch := range consumerCh.channels {
598 // Channel will be closed in the removeChannel method
599 removeChannel(consumerCh.channels, ch)
600 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500601 err := closeConsumers(consumerCh.consumers)
602 //if err == sarama.ErrUnknownTopicOrPartition {
603 // // Not an error
604 // err = nil
605 //}
606 //err := consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500607 delete(sc.topicToConsumerChannelMap, topic.Name)
608 return err
609 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500610 log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
611 return nil
khenaidoo43c82122018-11-22 18:38:28 -0500612}
613
614func (sc *SaramaClient) clearConsumerChannelMap() error {
615 sc.lockTopicToConsumerChannelMap.Lock()
616 defer sc.lockTopicToConsumerChannelMap.Unlock()
617 var err error
618 for topic, consumerCh := range sc.topicToConsumerChannelMap {
619 for _, ch := range consumerCh.channels {
620 // Channel will be closed in the removeChannel method
621 removeChannel(consumerCh.channels, ch)
622 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500623 if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
624 err = errTemp
625 }
626 //err = consumerCh.consumers.Close()
khenaidoo43c82122018-11-22 18:38:28 -0500627 delete(sc.topicToConsumerChannelMap, topic)
628 }
629 return err
630}
631
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500632//createPublisher creates the publisher which is used to send a message onto kafka
633func (sc *SaramaClient) createPublisher() error {
khenaidoo43c82122018-11-22 18:38:28 -0500634 // This Creates the publisher
635 config := sarama.NewConfig()
636 config.Producer.Partitioner = sarama.NewRandomPartitioner
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500637 config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
638 config.Producer.Flush.Messages = sc.producerFlushMessages
639 config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
640 config.Producer.Return.Errors = sc.producerReturnErrors
641 config.Producer.Return.Successes = sc.producerReturnSuccess
642 //config.Producer.RequiredAcks = sarama.WaitForAll
643 config.Producer.RequiredAcks = sarama.WaitForLocal
644
khenaidoo43c82122018-11-22 18:38:28 -0500645 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
646 brokers := []string{kafkaFullAddr}
647
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500648 if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
649 log.Errorw("error-starting-publisher", log.Fields{"error": err})
650 return err
651 } else {
652 sc.producer = producer
khenaidoo43c82122018-11-22 18:38:28 -0500653 }
654 log.Info("Kafka-publisher-created")
655 return nil
656}
657
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500658func (sc *SaramaClient) createConsumer() error {
khenaidoo43c82122018-11-22 18:38:28 -0500659 config := sarama.NewConfig()
660 config.Consumer.Return.Errors = true
661 config.Consumer.Fetch.Min = 1
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500662 config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
663 config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500664 config.Consumer.Offsets.Initial = sarama.OffsetNewest
665 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
666 brokers := []string{kafkaFullAddr}
667
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500668 if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
669 log.Errorw("error-starting-consumers", log.Fields{"error": err})
670 return err
671 } else {
672 sc.consumer = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500673 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500674 log.Info("Kafka-consumers-created")
khenaidoo43c82122018-11-22 18:38:28 -0500675 return nil
676}
677
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500678// createGroupConsumer creates a consumers group
khenaidooca301322019-01-09 23:06:32 -0500679func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
khenaidoo43c82122018-11-22 18:38:28 -0500680 config := scc.NewConfig()
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500681 config.ClientID = uuid.New().String()
khenaidoo43c82122018-11-22 18:38:28 -0500682 config.Group.Mode = scc.ConsumerModeMultiplex
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500683 //config.Consumer.Return.Errors = true
684 //config.Group.Return.Notifications = false
685 //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
686 //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
khenaidoo43c82122018-11-22 18:38:28 -0500687 config.Consumer.Offsets.Initial = sarama.OffsetNewest
khenaidooca301322019-01-09 23:06:32 -0500688 //config.Consumer.Offsets.Initial = sarama.OffsetOldest
khenaidoo43c82122018-11-22 18:38:28 -0500689 kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
690 brokers := []string{kafkaFullAddr}
691
khenaidoo43c82122018-11-22 18:38:28 -0500692 topics := []string{topic.Name}
693 var consumer *scc.Consumer
694 var err error
695
khenaidooca301322019-01-09 23:06:32 -0500696 if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
697 log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500698 return nil, err
699 }
khenaidooca301322019-01-09 23:06:32 -0500700 log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
khenaidoo43c82122018-11-22 18:38:28 -0500701 //time.Sleep(10*time.Second)
khenaidooca301322019-01-09 23:06:32 -0500702 //sc.groupConsumer = consumer
703 sc.groupConsumers[topic.Name] = consumer
khenaidoo43c82122018-11-22 18:38:28 -0500704 return consumer, nil
705}
706
khenaidoo43c82122018-11-22 18:38:28 -0500707// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
708// topic via the unique channel each subsciber received during subscription
khenaidoo79232702018-12-04 11:00:41 -0500709func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500710 // Need to go over all channels and publish messages to them - do we need to copy msg?
711 sc.lockTopicToConsumerChannelMap.Lock()
712 defer sc.lockTopicToConsumerChannelMap.Unlock()
713 for _, ch := range consumerCh.channels {
khenaidoo79232702018-12-04 11:00:41 -0500714 go func(c chan *ic.InterContainerMessage) {
khenaidoo43c82122018-11-22 18:38:28 -0500715 c <- protoMessage
716 }(ch)
717 }
718}
719
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500720func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
721 log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500722startloop:
723 for {
724 select {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500725 case err := <-consumer.Errors():
726 if err != nil {
727 log.Warnw("partition-consumers-error", log.Fields{"error": err})
728 } else {
729 // There is a race condition when this loop is stopped and the consumer is closed where
730 // the actual error comes as nil
731 log.Warn("partition-consumers-error")
732 }
733 case msg := <-consumer.Messages():
khenaidoo43c82122018-11-22 18:38:28 -0500734 //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500735 if msg == nil {
736 // There is a race condition when this loop is stopped and the consumer is closed where
737 // the actual msg comes as nil
738 break startloop
739 }
740 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500741 icm := &ic.InterContainerMessage{}
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500742 if err := proto.Unmarshal(msgBody, icm); err != nil {
743 log.Warnw("partition-invalid-message", log.Fields{"error": err})
744 continue
745 }
746 go sc.dispatchToConsumers(consumerChnls, icm)
747 case <-sc.doneCh:
748 log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
749 break startloop
750 }
751 }
752 log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
753}
754
755func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
756 log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
757
khenaidooca301322019-01-09 23:06:32 -0500758 //go func() {
759 // for msg := range consumer.Errors() {
760 // log.Warnw("group-consumers-error", log.Fields{"error": msg.Error()})
761 // }
762 //}()
763 //
764 //go func() {
765 // for ntf := range consumer.Notifications() {
766 // log.Debugw("group-received-notification", log.Fields{"notification": ntf})
767 // }
768 //}()
769
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500770startloop:
771 for {
772 select {
773 case err := <-consumer.Errors():
774 if err != nil {
khenaidooca301322019-01-09 23:06:32 -0500775 log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500776 } else {
777 // There is a race condition when this loop is stopped and the consumer is closed where
778 // the actual error comes as nil
khenaidooca301322019-01-09 23:06:32 -0500779 log.Warnw("group-consumers-error-nil", log.Fields{"topic": topic.Name})
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500780 }
781 case msg := <-consumer.Messages():
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500782 if msg == nil {
783 // There is a race condition when this loop is stopped and the consumer is closed where
784 // the actual msg comes as nil
785 break startloop
786 }
khenaidooca301322019-01-09 23:06:32 -0500787 log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
khenaidoo43c82122018-11-22 18:38:28 -0500788 msgBody := msg.Value
khenaidoo79232702018-12-04 11:00:41 -0500789 icm := &ic.InterContainerMessage{}
khenaidoo43c82122018-11-22 18:38:28 -0500790 if err := proto.Unmarshal(msgBody, icm); err != nil {
791 log.Warnw("invalid-message", log.Fields{"error": err})
792 continue
793 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500794 go sc.dispatchToConsumers(consumerChnls, icm)
795 consumer.MarkOffset(msg, "")
796 case ntf := <-consumer.Notifications():
797 log.Debugw("group-received-notification", log.Fields{"notification": ntf})
khenaidoo43c82122018-11-22 18:38:28 -0500798 case <-sc.doneCh:
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500799 log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500800 break startloop
801 }
802 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500803 log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500804}
805
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500806func (sc *SaramaClient) startConsumers(topic *Topic) error {
807 log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
808 var consumerCh *consumerChannels
809 if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
810 log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
811 return errors.New("consumers-not-exist")
812 }
813 // For each consumer listening for that topic, start a consumption loop
814 for _, consumer := range consumerCh.consumers {
815 if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
816 go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
817 } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
818 go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
819 } else {
820 log.Errorw("invalid-consumer", log.Fields{"topic": topic})
821 return errors.New("invalid-consumer")
822 }
823 }
824 return nil
825}
826
827//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
828//// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500829func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500830 var pConsumers []sarama.PartitionConsumer
khenaidoo43c82122018-11-22 18:38:28 -0500831 var err error
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500832
833 if pConsumers, err = sc.createPartionConsumers(topic, initialOffset); err != nil {
834 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500835 return nil, err
836 }
837
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500838 consumersIf := make([]interface{}, 0)
839 for _, pConsumer := range pConsumers {
840 consumersIf = append(consumersIf, pConsumer)
841 }
842
843 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
khenaidoo43c82122018-11-22 18:38:28 -0500844 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500845 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo43c82122018-11-22 18:38:28 -0500846 cc := &consumerChannels{
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500847 consumers: consumersIf,
khenaidoo79232702018-12-04 11:00:41 -0500848 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo43c82122018-11-22 18:38:28 -0500849 }
850
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500851 // Add the consumers channel to the map
khenaidoo43c82122018-11-22 18:38:28 -0500852 sc.addTopicToConsumerChannelMap(topic.Name, cc)
853
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500854 //Start a consumers to listen on that specific topic
855 go sc.startConsumers(topic)
khenaidoo43c82122018-11-22 18:38:28 -0500856
857 return consumerListeningChannel, nil
858}
859
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500860// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
861// for that topic. It also starts the routine that listens for messages on that topic.
khenaidoo79232702018-12-04 11:00:41 -0500862func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500863 // TODO: Replace this development partition consumers with a group consumers
864 var pConsumer *scc.Consumer
865 var err error
khenaidooca301322019-01-09 23:06:32 -0500866 if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500867 log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
868 return nil, err
869 }
870 // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
871 // unbuffered to verify race conditions.
khenaidoo79232702018-12-04 11:00:41 -0500872 consumerListeningChannel := make(chan *ic.InterContainerMessage)
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500873 cc := &consumerChannels{
874 consumers: []interface{}{pConsumer},
khenaidoo79232702018-12-04 11:00:41 -0500875 channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500876 }
877
878 // Add the consumers channel to the map
879 sc.addTopicToConsumerChannelMap(topic.Name, cc)
880
881 //Start a consumers to listen on that specific topic
882 go sc.startConsumers(topic)
883
884 return consumerListeningChannel, nil
885}
886
887func (sc *SaramaClient) createPartionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
888 log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
khenaidoo43c82122018-11-22 18:38:28 -0500889 partitionList, err := sc.consumer.Partitions(topic.Name)
890 if err != nil {
891 log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
892 return nil, err
893 }
894
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500895 pConsumers := make([]sarama.PartitionConsumer, 0)
896 for _, partition := range partitionList {
897 var pConsumer sarama.PartitionConsumer
898 if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
899 log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
900 return nil, err
901 }
902 pConsumers = append(pConsumers, pConsumer)
khenaidoo43c82122018-11-22 18:38:28 -0500903 }
khenaidoo4c1a5bf2018-11-29 15:53:42 -0500904 return pConsumers, nil
khenaidoo43c82122018-11-22 18:38:28 -0500905}
906
khenaidoo79232702018-12-04 11:00:41 -0500907func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
khenaidoo43c82122018-11-22 18:38:28 -0500908 var i int
khenaidoo79232702018-12-04 11:00:41 -0500909 var channel chan *ic.InterContainerMessage
khenaidoo43c82122018-11-22 18:38:28 -0500910 for i, channel = range channels {
911 if channel == ch {
912 channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
913 close(channel)
914 return channels[:len(channels)-1]
915 }
916 }
917 return channels
918}