blob: 7946205a2880a0c56bc632c3f5b4798cb06006e7 [file] [log] [blame]
Scott Bakere702d122019-10-22 11:54:12 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package afrouterd
18
19import (
20 "fmt"
Scott Bakerf579f132019-10-24 14:31:41 -070021 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Scott Bakerb6de7a52019-11-04 09:13:37 -080022 pb "github.com/opencord/voltha-protos/v2/go/afrouter"
Scott Bakere702d122019-10-22 11:54:12 -070023 "golang.org/x/net/context"
24 "google.golang.org/grpc"
25 "google.golang.org/grpc/connectivity"
26 "google.golang.org/grpc/keepalive"
27 "math"
28 "os"
29 "strconv"
30 "time"
31)
32
33type volthaPod struct {
34 name string
35 ipAddr string
36 node string
37 devIds map[string]struct{}
38 backend string
39}
40
41// TODO: These variables should be passed in from main() rather than
42// declared here.
43
44var (
45 // if k8s variables are undefined, will attempt to use in-cluster config
46 k8sApiServer = GetStrEnv("K8S_API_SERVER", "")
47 k8sKubeConfigPath = GetStrEnv("K8S_KUBE_CONFIG_PATH", "")
48
49 podNamespace = GetStrEnv("POD_NAMESPACE", "voltha")
50 podLabelSelector = GetStrEnv("POD_LABEL_SELECTOR", "app=rw-core")
51 podAffinityGroupLabel = GetStrEnv("POD_AFFINITY_GROUP_LABEL", "affinity-group")
52
53 podGrpcPort = uint64(GetIntEnv("POD_GRPC_PORT", 0, math.MaxUint16, 50057))
54
55 afrouterRouterName = GetStrEnv("AFROUTER_ROUTER_NAME", "vcore")
56 afrouterRouteName = GetStrEnv("AFROUTER_ROUTE_NAME", "dev_manager")
57 afrouterRWClusterName = GetStrEnv("AFROUTER_RW_CLUSTER_NAME", "vcore")
58
59 kafkaTopic = GetStrEnv("KAFKA_TOPIC", "AffinityRouter")
60 kafkaClientType = GetStrEnv("KAFKA_CLIENT_TYPE", "sarama")
61 kafkaHost = GetStrEnv("KAFKA_HOST", "kafka")
62 kafkaPort = GetIntEnv("KAFKA_PORT", 0, math.MaxUint16, 9092)
63 kafkaInstanceID = GetStrEnv("KAFKA_INSTANCE_ID", "arouterd")
64)
65
66func GetIntEnv(key string, min, max, defaultValue int) int {
67 if val, have := os.LookupEnv(key); have {
68 num, err := strconv.Atoi(val)
69 if err != nil || !(min <= num && num <= max) {
70 panic(fmt.Errorf("%s must be a number in the range [%d, %d]; default: %d", key, min, max, defaultValue))
71 }
72 return num
73 }
74 return defaultValue
75}
76
77func GetStrEnv(key, defaultValue string) string {
78 if val, have := os.LookupEnv(key); have {
79 return val
80 }
81 return defaultValue
82}
83
84func Connect(ctx context.Context, addr string) (*grpc.ClientConn, error) {
85 log.Debugf("Trying to connect to %s", addr)
86 conn, err := grpc.DialContext(ctx, addr,
87 grpc.WithInsecure(),
88 grpc.WithBlock(),
89 grpc.WithBackoffMaxDelay(time.Second*5),
90 grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: time.Second * 10, Timeout: time.Second * 5}))
91 if err == nil {
92 log.Debugf("Connection succeeded")
93 }
94 return conn, err
95}
96
97func setAffinity(ctx context.Context, client pb.ConfigurationClient, deviceId string, backend string) {
98 log.Debugf("Configuring backend %s with device id %s \n", backend, deviceId)
99 if res, err := client.SetAffinity(ctx, &pb.Affinity{
100 Router: afrouterRouterName,
101 Route: afrouterRouteName,
102 Cluster: afrouterRWClusterName,
103 Backend: backend,
104 Id: deviceId,
105 }); err != nil {
106 log.Debugf("failed affinity RPC call: %s\n", err)
107 } else {
108 log.Debugf("Result: %v\n", res)
109 }
110}
111
112// endOnClose cancels the context when the connection closes
113func ConnectionActiveContext(conn *grpc.ClientConn) context.Context {
114 ctx, disconnected := context.WithCancel(context.Background())
115 go func() {
116 for state := conn.GetState(); state != connectivity.TransientFailure && state != connectivity.Shutdown; state = conn.GetState() {
117 if !conn.WaitForStateChange(context.Background(), state) {
118 break
119 }
120 }
121 log.Infof("Connection to afrouter lost")
122 disconnected()
123 }()
124 return ctx
125}