blob: 2e3e831f6eddc908797dae1f63a5ab0139654701 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package afrouterd
import (
"errors"
"github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-lib-go/v2/pkg/kafka"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"github.com/opencord/voltha-lib-go/v2/pkg/probe"
pb "github.com/opencord/voltha-protos/v2/go/afrouter"
ic "github.com/opencord/voltha-protos/v2/go/inter_container"
"golang.org/x/net/context"
"regexp"
"time"
)
func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
log.Infow("kafka-client-type", log.Fields{"client": clientType})
switch clientType {
case "sarama":
return kafka.NewSaramaClient(
kafka.Host(host),
kafka.Port(port),
kafka.ConsumerType(kafka.GroupCustomer),
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
kafka.ProducerMaxRetries(6),
kafka.NumPartitions(3),
kafka.ConsumerGroupName(instanceID),
kafka.ConsumerGroupPrefix(instanceID),
kafka.AutoCreateTopic(false),
kafka.ProducerFlushFrequency(5),
kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
}
return nil, errors.New("unsupported-client-type")
}
func monitorDiscovery(kc kafka.Client, ctx context.Context, client pb.ConfigurationClient, ch <-chan *ic.InterContainerMessage, doneCh chan<- struct{}) {
defer close(doneCh)
defer kc.Stop()
defer probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
monitorLoop:
for {
select {
case <-ctx.Done():
break monitorLoop
case msg := <-ch:
log.Debug("Received a device discovery notification")
device := &ic.DeviceDiscovered{}
if err := ptypes.UnmarshalAny(msg.Body, device); err != nil {
log.Errorf("Could not unmarshal received notification %v", msg)
} else {
// somewhat hackish solution, backend is known from the first digit found in the publisher name
group := regexp.MustCompile(`\d`).FindString(device.Publisher)
if group != "" {
// set the affinity of the discovered device
setAffinity(ctx, client, device.Id, afrouterRWClusterName+group)
} else {
log.Error("backend is unknown")
}
}
}
}
}
func StartDiscoveryMonitor(ctx context.Context, client pb.ConfigurationClient) (<-chan struct{}, error) {
doneCh := make(chan struct{})
// Connect to kafka for discovery events
kc, err := newKafkaClient(kafkaClientType, kafkaHost, kafkaPort, kafkaInstanceID)
if err != nil {
panic(err)
}
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
for {
if err := kc.Start(); err != nil {
log.Error("Could not connect to kafka")
} else {
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
break
}
select {
case <-ctx.Done():
close(doneCh)
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
return doneCh, errors.New("GRPC context done")
case <-time.After(5 * time.Second):
}
}
ch, err := kc.Subscribe(&kafka.Topic{Name: kafkaTopic})
if err != nil {
log.Errorf("Could not subscribe to the '%s' channel, discovery disabled", kafkaTopic)
close(doneCh)
kc.Stop()
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusStopped)
return doneCh, err
}
go monitorDiscovery(kc, ctx, client, ch, doneCh)
return doneCh, nil
}