blob: 0a952dcf3c5645108ac6a466b98bdda9d31c0ecb [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package core
18
19import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "github.com/opencord/bbsim-sadis-server/internal/utils"
David K. Bainbridge06631892021-08-19 13:07:00 +000024 "github.com/opencord/voltha-lib-go/v7/pkg/log"
Matteo Scandoloa4285862020-12-01 18:10:10 -080025 v1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Matteo Scandoloabf872d2020-12-14 08:22:06 -100027 "k8s.io/apimachinery/pkg/watch"
Matteo Scandoloa4285862020-12-01 18:10:10 -080028 "k8s.io/client-go/kubernetes"
29 "net/http"
30 "sync"
Matteo Scandolo5b6eb8d2020-12-15 12:21:33 -100031 "time"
Matteo Scandoloa4285862020-12-01 18:10:10 -080032)
33
Matteo Scandolo5b6eb8d2020-12-15 12:21:33 -100034const attemptLimit = 10
35
Matteo Scandoloa4285862020-12-01 18:10:10 -080036type Watcher struct {
37 client *kubernetes.Clientset
38 store *Store
39 config *utils.ConfigFlags
40}
41
42func NewWatcher(client *kubernetes.Clientset, store *Store, cf *utils.ConfigFlags) *Watcher {
43 return &Watcher{
44 client: client,
45 store: store,
46 config: cf,
47 }
48}
49
50func (w *Watcher) Watch(ctx context.Context, wg *sync.WaitGroup) {
51 defer wg.Done()
Matteo Scandoloabf872d2020-12-14 08:22:06 -100052
53 // we need to watch for PODs, services can't respond to requests if the backend is not there
54 // note that when this container starts we receive notifications for all of the existing pods
55
56 watcher, err := w.client.CoreV1().Pods("").Watch(context.TODO(), metav1.ListOptions{LabelSelector: "app=bbsim"})
57 if err != nil {
58 logger.Fatalw(ctx, "error-while-watching-pods", log.Fields{"err": err})
59 }
60
61 ch := watcher.ResultChan()
62 for event := range ch {
63
64 pod, ok := event.Object.(*v1.Pod)
65 if !ok {
66 logger.Fatalw(ctx, "unexpected-type-while-watching-pod", log.Fields{"object": event.Object})
Matteo Scandoloa4285862020-12-01 18:10:10 -080067 }
Matteo Scandoloabf872d2020-12-14 08:22:06 -100068
69 logger.Debugw(ctx, "received-pod-event", log.Fields{"object": event.Type, "pod": pod.Name})
70 if event.Type == watch.Deleted {
71 // TODO remove sadis entries
72 logger.Debug(ctx, "pod-has-been-removed")
73 }
74
75 if event.Type == watch.Added || event.Type == watch.Modified {
76 // fetch the sadis information and store them
77
78 // the pod is ready only if all the containers in it are ready,
79 // for now the BBSim pod only has 1 container, but things may change in the future, so keep the loop
80 ready := true
81
82 if len(pod.Status.ContainerStatuses) == 0 {
83 // if there are no containers in the pod, then it's not ready
84 ready = false
85 }
86
87 for _, containerStatus := range pod.Status.ContainerStatuses {
88 if !containerStatus.Ready {
89 // if one of the container is not ready, then the entire pod is not ready
90 ready = false
91 }
92 }
93
94 logger.Debugw(ctx, "received-event-for-bbsim-pod", log.Fields{"pod": pod.Name, "namespace": pod.Namespace,
Matteo Scandolo06e564f2022-03-22 15:45:26 -070095 "release": pod.Labels["release"], "ready": ready, "podIp": pod.Status.PodIP})
Matteo Scandoloabf872d2020-12-14 08:22:06 -100096
97 // as soon as the pod is ready cache the sadis entries
98 if ready {
Matteo Scandolo06e564f2022-03-22 15:45:26 -070099 if err := w.queryPod(ctx, pod.Status.PodIP, 0); err != nil {
100 logger.Errorw(ctx, "failed-to-load-sadis-config-from-bbsim",
101 log.Fields{"pod": pod.Name, "namespace": pod.Namespace, "release": pod.Labels["release"], "err": err})
Matteo Scandoloabf872d2020-12-14 08:22:06 -1000102 }
Matteo Scandoloabf872d2020-12-14 08:22:06 -1000103 }
104 }
105
Matteo Scandoloa4285862020-12-01 18:10:10 -0800106 }
107}
108
Matteo Scandolo06e564f2022-03-22 15:45:26 -0700109func (w *Watcher) queryPod(ctx context.Context, ip string, attempt int) error {
110 endpoint := fmt.Sprintf("%s:%d", ip, w.config.BBsimSadisPort)
Matteo Scandoloa4285862020-12-01 18:10:10 -0800111 logger.Infow(ctx, "querying-service", log.Fields{"endpoint": endpoint})
112
Matteo Scandolo06e564f2022-03-22 15:45:26 -0700113 client := http.Client{Timeout: 5 * time.Second}
114
115 res, err := client.Get(fmt.Sprintf("http://%s/v2/static", endpoint))
Matteo Scandoloa4285862020-12-01 18:10:10 -0800116
117 if err != nil {
Matteo Scandolo5b6eb8d2020-12-15 12:21:33 -1000118 if attempt < attemptLimit {
119 logger.Warnw(ctx, "error-while-reading-from-service-retrying", log.Fields{"error": err.Error()})
120 // if there is an error and we have attempt left just retry later
121 time.Sleep(1 * time.Second)
Matteo Scandolo06e564f2022-03-22 15:45:26 -0700122 return w.queryPod(ctx, ip, attempt+1)
Matteo Scandolo5b6eb8d2020-12-15 12:21:33 -1000123 }
124
Matteo Scandoloa4285862020-12-01 18:10:10 -0800125 return err
126 }
127
128 if res.Body != nil {
129 defer res.Body.Close()
130 }
131
132 var result SadisConfig
133
134 decoder := json.NewDecoder(res.Body)
135 if err := decoder.Decode(&result); err != nil {
136 logger.Errorw(ctx, "cannot-decode-sadis-response", log.Fields{"error": err.Error()})
137 return err
138 }
139
140 logger.Debugw(ctx, "fetched-sadis-config", log.Fields{
141 "endpoint": endpoint,
142 "entries": len(result.Sadis.Entries),
143 "bandwidthProfiles": len(result.BandwidthProfile.Entries),
144 })
145
Matteo Scandoloa4285862020-12-01 18:10:10 -0800146 for _, entry := range result.Sadis.Entries {
147 if entry.HardwareIdentifier != "" {
148 e := SadisOltEntry{
149 ID: entry.ID,
150 HardwareIdentifier: entry.HardwareIdentifier,
151 IPAddress: entry.IPAddress,
152 NasID: entry.NasID,
153 UplinkPort: entry.UplinkPort,
Andrea Campanella68c39e62022-03-10 17:14:13 +0100154 NniDhcpTrapVid: entry.NniDhcpTrapVid,
Matteo Scandoloa4285862020-12-01 18:10:10 -0800155 }
156 w.store.addOlt(ctx, e)
157 continue
158 }
159 if len(entry.UniTagList) != 0 {
160 e := SadisOnuEntryV2{
161 ID: entry.ID,
162 NasPortID: entry.NasPortID,
163 CircuitID: entry.CircuitID,
164 RemoteID: entry.RemoteID,
165 UniTagList: entry.UniTagList,
166 }
167 w.store.addOnu(ctx, e)
168 continue
169 }
170 logger.Warnw(ctx, "unknown-entity", log.Fields{"entry": entry})
171 }
172
173 for _, bp := range result.BandwidthProfile.Entries {
174 w.store.addBp(ctx, *bp)
175 }
176
177 logger.Infow(ctx, "stored-sadis-config", log.Fields{"endpoint": endpoint})
178
179 return nil
180}