blob: 99e75d401f460a4dd450e36a2107e4b51719157e [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package core
18
19import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "github.com/opencord/bbsim-sadis-server/internal/utils"
24 "github.com/opencord/voltha-lib-go/v4/pkg/log"
25 v1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/client-go/kubernetes"
28 "net/http"
29 "sync"
30 "time"
31)
32
33type Watcher struct {
34 client *kubernetes.Clientset
35 store *Store
36 config *utils.ConfigFlags
37}
38
39func NewWatcher(client *kubernetes.Clientset, store *Store, cf *utils.ConfigFlags) *Watcher {
40 return &Watcher{
41 client: client,
42 store: store,
43 config: cf,
44 }
45}
46
47func (w *Watcher) Watch(ctx context.Context, wg *sync.WaitGroup) {
48 defer wg.Done()
49 for {
50 logger.Debug(ctx, "fetching-pods")
51 services, err := w.client.CoreV1().Services("").List(context.TODO(), metav1.ListOptions{LabelSelector: "app=bbsim"})
52 if err != nil {
53 panic(err.Error())
54 }
55 logger.Debugf(ctx, "There are %d services in the cluster", len(services.Items))
56 w.handleServices(ctx, services)
57 time.Sleep(10 * time.Second)
58 }
59}
60
61func (w *Watcher) handleServices(ctx context.Context, services *v1.ServiceList) {
62 // TODO if a service is removed we'll want to remove the related entries
63 for _, service := range services.Items {
64 if err := w.queryService(ctx, service); err != nil {
65 logger.Errorw(ctx, "error-while-reading-from-service", log.Fields{"error": err.Error()})
66 }
67 }
68}
69
70func (w *Watcher) queryService(ctx context.Context, service v1.Service) error {
71 endpoint := fmt.Sprintf("%s.%s.svc:%d", service.Name, service.Namespace, w.config.BBsimSadisPort)
72 logger.Infow(ctx, "querying-service", log.Fields{"endpoint": endpoint})
73
74 res, err := http.Get(fmt.Sprintf("http://%s/v2/static", endpoint))
75
76 if err != nil {
77 return err
78 }
79
80 if res.Body != nil {
81 defer res.Body.Close()
82 }
83
84 var result SadisConfig
85
86 decoder := json.NewDecoder(res.Body)
87 if err := decoder.Decode(&result); err != nil {
88 logger.Errorw(ctx, "cannot-decode-sadis-response", log.Fields{"error": err.Error()})
89 return err
90 }
91
92 logger.Debugw(ctx, "fetched-sadis-config", log.Fields{
93 "endpoint": endpoint,
94 "entries": len(result.Sadis.Entries),
95 "bandwidthProfiles": len(result.BandwidthProfile.Entries),
96 })
97
98 //for _, entry := range result.Sadis.Entries {
99 // switch entry.(type) {
100 // case SadisOltEntry:
101 // case *SadisOltEntry:
102 // logger.Infow(ctx, "olt-entry", log.Fields{"entry": entry})
103 // case SadisOnuEntryV2:
104 // case *SadisOnuEntryV2:
105 // logger.Infow(ctx, "onu-entry", log.Fields{"entry": entry})
106 // default:
107 // logger.Warnw(ctx, "unknown-entity", log.Fields{"entry": entry})
108 // }
109 //}
110
111 for _, entry := range result.Sadis.Entries {
112 if entry.HardwareIdentifier != "" {
113 e := SadisOltEntry{
114 ID: entry.ID,
115 HardwareIdentifier: entry.HardwareIdentifier,
116 IPAddress: entry.IPAddress,
117 NasID: entry.NasID,
118 UplinkPort: entry.UplinkPort,
119 }
120 w.store.addOlt(ctx, e)
121 continue
122 }
123 if len(entry.UniTagList) != 0 {
124 e := SadisOnuEntryV2{
125 ID: entry.ID,
126 NasPortID: entry.NasPortID,
127 CircuitID: entry.CircuitID,
128 RemoteID: entry.RemoteID,
129 UniTagList: entry.UniTagList,
130 }
131 w.store.addOnu(ctx, e)
132 continue
133 }
134 logger.Warnw(ctx, "unknown-entity", log.Fields{"entry": entry})
135 }
136
137 for _, bp := range result.BandwidthProfile.Entries {
138 w.store.addBp(ctx, *bp)
139 }
140
141 logger.Infow(ctx, "stored-sadis-config", log.Fields{"endpoint": endpoint})
142
143 return nil
144}