blob: dce43ede4aa2a628dd313c77804cfeb5d98e2748 [file] [log] [blame]
khenaidoo106c61a2021-08-11 18:05:46 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package techprofile
18
19import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "regexp"
25 "strconv"
26 "sync"
27 "time"
28
29 "github.com/golang/protobuf/jsonpb"
30 "github.com/golang/protobuf/proto"
31 "github.com/opencord/voltha-lib-go/v7/pkg/db"
32 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
33 "github.com/opencord/voltha-lib-go/v7/pkg/log"
khenaidoo106c61a2021-08-11 18:05:46 -040034 tp_pb "github.com/opencord/voltha-protos/v5/go/tech_profile"
35)
36
37// Interface to pon resource manager APIs
38type iPonResourceMgr interface {
39 GetResourceID(ctx context.Context, intfID uint32, resourceType string, numIDs uint32) ([]uint32, error)
40 FreeResourceID(ctx context.Context, intfID uint32, resourceType string, ReleaseContent []uint32) error
41 GetResourceTypeAllocID() string
42 GetResourceTypeGemPortID() string
43 GetResourceTypeOnuID() string
44 GetTechnology() string
45}
46
47type SchedulingPolicy int32
48
49const (
50 SchedulingPolicy_WRR SchedulingPolicy = 0
51 SchedulingPolicy_StrictPriority SchedulingPolicy = 1
52 SchedulingPolicy_Hybrid SchedulingPolicy = 2
53)
54
55type AdditionalBW int32
56
57const (
58 AdditionalBW_AdditionalBW_None AdditionalBW = 0
59 AdditionalBW_AdditionalBW_NA AdditionalBW = 1
60 AdditionalBW_AdditionalBW_BestEffort AdditionalBW = 2
61 AdditionalBW_AdditionalBW_Auto AdditionalBW = 3
62)
63
64type DiscardPolicy int32
65
66const (
67 DiscardPolicy_TailDrop DiscardPolicy = 0
68 DiscardPolicy_WTailDrop DiscardPolicy = 1
69 DiscardPolicy_Red DiscardPolicy = 2
70 DiscardPolicy_WRed DiscardPolicy = 3
71)
72
73// Required uniPortName format
74var uniPortNameFormatRegexp = regexp.MustCompile(`^olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
75
76// instance control defaults
77const (
78 defaultOnuInstance = "multi-instance"
79 defaultUniInstance = "single-instance"
80 defaultGemPayloadSize = "auto"
81)
82
83// default discard config constants
84const (
85 defaultMinThreshold = 0
86 defaultMaxThreshold = 0
87 defaultMaxProbability = 0
88)
89
90// default scheduler contants
91const (
92 defaultPriority = 0
93 defaultWeight = 0
94)
95
96// default GEM attribute constants
97const (
98 defaultAESEncryption = "True"
99 defaultPriorityQueue = 0
100 defaultQueueWeight = 0
101 defaultMaxQueueSize = "auto"
102 defaultIsMulticast = "False"
103 defaultAccessControlList = "224.0.0.0-239.255.255.255"
104 defaultMcastGemID = 4069
105)
106
107// Default EPON constants
108const (
109 defaultPakageType = "B"
110)
111const (
112 defaultTrafficType = "BE"
113 defaultUnsolicitedGrantSize = 0
114 defaultNominalInterval = 0
115 defaultToleratedPollJitter = 0
116 defaultRequestTransmissionPolicy = 0
117 defaultNumQueueSet = 2
118)
119const (
120 defaultQThreshold1 = 5500
121 defaultQThreshold2 = 0
122 defaultQThreshold3 = 0
123 defaultQThreshold4 = 0
124 defaultQThreshold5 = 0
125 defaultQThreshold6 = 0
126 defaultQThreshold7 = 0
127)
128
129const (
130 xgspon = "XGS-PON"
131 xgpon = "XGPON"
132 gpon = "GPON"
133 epon = "EPON"
134)
135
136const (
137 MaxUniPortPerOnu = 16 // TODO: Adapter uses its own constant for MaxUniPort. How to synchronize this and have a single source of truth?
138)
139
140type TechProfileMgr struct {
141 config *TechProfileFlags
142 resourceMgr iPonResourceMgr
143 OnuIDMgmtLock sync.RWMutex
144 GemPortIDMgmtLock sync.RWMutex
145 AllocIDMgmtLock sync.RWMutex
146 tpInstanceMap map[string]*tp_pb.TechProfileInstance // Map of tp path to tp instance
147 tpInstanceMapLock sync.RWMutex
148 eponTpInstanceMap map[string]*tp_pb.EponTechProfileInstance // Map of tp path to epon tp instance
149 epontpInstanceMapLock sync.RWMutex
150 tpMap map[uint32]*tp_pb.TechProfile // Map of tp id to tp
151 tpMapLock sync.RWMutex
152 eponTpMap map[uint32]*tp_pb.EponTechProfile // map of tp id to epon tp
153 eponTpMapLock sync.RWMutex
154}
155
156func (t *TechProfileMgr) SetKVClient(ctx context.Context, pathPrefix string) *db.Backend {
157 kvClient, err := newKVClient(ctx, t.config.KVStoreType, t.config.KVStoreAddress, t.config.KVStoreTimeout)
158 if err != nil {
159 logger.Errorw(ctx, "failed-to-create-kv-client",
160 log.Fields{
161 "type": t.config.KVStoreType, "address": t.config.KVStoreAddress,
162 "timeout": t.config.KVStoreTimeout, "prefix": pathPrefix,
163 "error": err.Error(),
164 })
165 return nil
166 }
167 return &db.Backend{
168 Client: kvClient,
169 StoreType: t.config.KVStoreType,
170 Address: t.config.KVStoreAddress,
171 Timeout: t.config.KVStoreTimeout,
172 PathPrefix: pathPrefix}
173
174 /* TODO : Make sure direct call to NewBackend is working fine with backend , currently there is some
175 issue between kv store and backend , core is not calling NewBackend directly
176 kv := model.NewBackend(t.config.kvStoreType, t.config.KVStoreHost, t.config.KVStorePort,
177 t.config.KVStoreTimeout, kvStoreTechProfilePathPrefix)
178 */
179}
180
Holger Hildebrandt143b5be2023-02-10 08:28:15 +0000181func (t *TechProfileMgr) CloseKVClient(ctx context.Context) {
182 if t.config.KVBackend != nil {
183 t.config.KVBackend.Client.Close(ctx)
184 t.config.KVBackend = nil
185 }
186 if t.config.DefaultTpKVBackend != nil {
187 t.config.DefaultTpKVBackend.Client.Close(ctx)
188 t.config.DefaultTpKVBackend = nil
189 }
190 if t.config.ResourceInstanceKVBacked != nil {
191 t.config.ResourceInstanceKVBacked.Client.Close(ctx)
192 t.config.ResourceInstanceKVBacked = nil
193 }
194}
195
khenaidoo106c61a2021-08-11 18:05:46 -0400196func NewTechProfile(ctx context.Context, resourceMgr iPonResourceMgr, kvStoreType string, kvStoreAddress string, basePathKvStore string) (*TechProfileMgr, error) {
197 var techprofileObj TechProfileMgr
198 logger.Debug(ctx, "initializing-techprofile-mananger")
199 techprofileObj.config = NewTechProfileFlags(kvStoreType, kvStoreAddress, basePathKvStore)
200 techprofileObj.config.KVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.TPKVPathPrefix)
201 techprofileObj.config.DefaultTpKVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.defaultTpKvPathPrefix)
202 if techprofileObj.config.KVBackend == nil {
203 logger.Error(ctx, "failed-to-initialize-backend")
204 return nil, errors.New("kv-backend-init-failed")
205 }
206 techprofileObj.config.ResourceInstanceKVBacked = techprofileObj.SetKVClient(ctx, techprofileObj.config.ResourceInstanceKVPathPrefix)
207 if techprofileObj.config.ResourceInstanceKVBacked == nil {
208 logger.Error(ctx, "failed-to-initialize-resource-instance-kv-backend")
209 return nil, errors.New("resource-instance-kv-backend-init-failed")
210 }
211 techprofileObj.resourceMgr = resourceMgr
212 techprofileObj.tpInstanceMap = make(map[string]*tp_pb.TechProfileInstance)
213 techprofileObj.eponTpInstanceMap = make(map[string]*tp_pb.EponTechProfileInstance)
214 techprofileObj.tpMap = make(map[uint32]*tp_pb.TechProfile)
215 techprofileObj.eponTpMap = make(map[uint32]*tp_pb.EponTechProfile)
216 logger.Debug(ctx, "reconcile-tp-instance-cache-start")
217 if err := techprofileObj.reconcileTpInstancesToCache(ctx); err != nil {
218 logger.Errorw(ctx, "failed-to-reconcile-tp-instances", log.Fields{"err": err})
219 return nil, err
220 }
221 logger.Debug(ctx, "reconcile-tp-instance-cache-end")
222 logger.Debug(ctx, "initializing-tech-profile-manager-object-success")
223 return &techprofileObj, nil
224}
225
226// GetTechProfileInstanceKey returns the tp instance key that is used to reference TP Instance Map
227func (t *TechProfileMgr) GetTechProfileInstanceKey(ctx context.Context, tpID uint32, uniPortName string) string {
228 logger.Debugw(ctx, "get-tp-instance-kv-key", log.Fields{
229 "uniPortName": uniPortName,
230 "tpId": tpID,
231 })
232 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
233 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
234 logger.Warnw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
235 }
236 // The key path prefix (like service/voltha/technology_profiles or service/voltha_voltha/technology_profiles)
237 // is expected to be attached by the components that use this path as part of the KVBackend configuration.
238 resourceInstanceKvPathSuffix := "%s/%d/%s" // <technology>/<tpID>/<uni-port-name>
239 // <uni-port-name> must be of the format pon-{\d+}/onu-{\d+}/uni-{\d+}
240 return fmt.Sprintf(resourceInstanceKvPathSuffix, t.resourceMgr.GetTechnology(), tpID, uniPortName)
241}
242
243// GetTPInstance gets TP instance from cache if found
244func (t *TechProfileMgr) GetTPInstance(ctx context.Context, path string) (interface{}, error) {
245 tech := t.resourceMgr.GetTechnology()
246 switch tech {
247 case xgspon, xgpon, gpon:
248 t.tpInstanceMapLock.RLock()
249 defer t.tpInstanceMapLock.RUnlock()
250 tpInst, ok := t.tpInstanceMap[path]
251 if !ok {
252 return nil, fmt.Errorf("tp-instance-not-found-tp-path-%v", path)
253 }
254 return tpInst, nil
255 case epon:
256 t.epontpInstanceMapLock.RLock()
257 defer t.epontpInstanceMapLock.RUnlock()
258 tpInst, ok := t.eponTpInstanceMap[path]
259 if !ok {
260 return nil, fmt.Errorf("tp-instance-not-found-tp-path-%v", path)
261 }
262 return tpInst, nil
263 default:
264 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
265 return nil, fmt.Errorf("unknown-tech-%s-tp-path-%v", tech, path)
266 }
267}
268
269// CreateTechProfileInstance creates a new TP instance.
270func (t *TechProfileMgr) CreateTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string, intfID uint32) (interface{}, error) {
271 var tpInstance *tp_pb.TechProfileInstance
272 var eponTpInstance *tp_pb.EponTechProfileInstance
273
274 logger.Infow(ctx, "creating-tp-instance", log.Fields{"tpID": tpID, "uni": uniPortName, "intId": intfID})
275
276 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
277 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
278 logger.Errorw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
279 return nil, fmt.Errorf("uni-port-name-not-confirming-to-format-%s", uniPortName)
280 }
281 tpInstancePathSuffix := t.GetTechProfileInstanceKey(ctx, tpID, uniPortName)
282
283 if t.resourceMgr.GetTechnology() == epon {
284 tp := t.getEponTPFromKVStore(ctx, tpID)
285 if tp != nil {
286 if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
287 logger.Error(ctx, "invalid-instance-ctrl-attr-using-default-tp")
288 tp = t.getDefaultEponProfile(ctx)
289 } else {
290 logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpID": tpID})
291 }
292 } else {
293 logger.Info(ctx, "tp-not-found-on-kv--creating-default-tp")
294 tp = t.getDefaultEponProfile(ctx)
295 }
296 // Store TP in cache
297 t.eponTpMapLock.Lock()
298 t.eponTpMap[tpID] = tp
299 t.eponTpMapLock.Unlock()
300
301 if eponTpInstance = t.allocateEponTPInstance(ctx, uniPortName, tp, intfID, tpInstancePathSuffix); eponTpInstance == nil {
302 logger.Error(ctx, "tp-instance-allocation-failed")
303 return nil, errors.New("tp-instance-allocation-failed")
304 }
305 t.epontpInstanceMapLock.Lock()
306 t.eponTpInstanceMap[tpInstancePathSuffix] = eponTpInstance
307 t.epontpInstanceMapLock.Unlock()
308 resInst := tp_pb.ResourceInstance{
309 TpId: tpID,
310 ProfileType: eponTpInstance.ProfileType,
311 SubscriberIdentifier: eponTpInstance.SubscriberIdentifier,
312 AllocId: eponTpInstance.AllocId,
313 }
314 for _, usQAttr := range eponTpInstance.UpstreamQueueAttributeList {
315 resInst.GemportIds = append(resInst.GemportIds, usQAttr.GemportId)
316 }
317
318 logger.Infow(ctx, "epon-tp-instance-created-successfully",
319 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
320 if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
321 logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
322 allocIDs := make([]uint32, 0)
323 allocIDs = append(allocIDs, resInst.AllocId)
324 errList := make([]error, 0)
325 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), allocIDs))
326 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), resInst.GemportIds))
327 if len(errList) > 0 {
328 logger.Errorw(ctx, "failed-to-free-up-resources-on-kv-store--system-behavior-has-become-erratic", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "errList": errList})
329 }
330 return nil, err
331 }
332 return eponTpInstance, nil
333 } else {
334 tp := t.getTPFromKVStore(ctx, tpID)
335 if tp != nil {
336 if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
337 logger.Error(ctx, "invalid-instance-ctrl-attr--using-default-tp")
338 tp = t.getDefaultTechProfile(ctx)
339 } else {
340 logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpID": tpID})
341 }
342 } else {
343 logger.Info(ctx, "tp-not-found-on-kv--creating-default-tp")
344 tp = t.getDefaultTechProfile(ctx)
345 }
346 // Store TP in cache
347 t.tpMapLock.Lock()
348 t.tpMap[tpID] = tp
349 t.tpMapLock.Unlock()
350
351 if tpInstance = t.allocateTPInstance(ctx, uniPortName, tp, intfID, tpInstancePathSuffix); tpInstance == nil {
352 logger.Error(ctx, "tp-instance-allocation-failed")
353 return nil, errors.New("tp-instance-allocation-failed")
354 }
355 t.tpInstanceMapLock.Lock()
356 t.tpInstanceMap[tpInstancePathSuffix] = tpInstance
357 t.tpInstanceMapLock.Unlock()
358
359 resInst := tp_pb.ResourceInstance{
360 TpId: tpID,
361 ProfileType: tpInstance.ProfileType,
362 SubscriberIdentifier: tpInstance.SubscriberIdentifier,
363 AllocId: tpInstance.UsScheduler.AllocId,
364 }
365 for _, usQAttr := range tpInstance.UpstreamGemPortAttributeList {
366 resInst.GemportIds = append(resInst.GemportIds, usQAttr.GemportId)
367 }
368
369 logger.Infow(ctx, "tp-instance-created-successfully",
370 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
371 if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
372 logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
373 allocIDs := make([]uint32, 0)
374 allocIDs = append(allocIDs, resInst.AllocId)
375 errList := make([]error, 0)
376 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), allocIDs))
377 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), resInst.GemportIds))
378 if len(errList) > 0 {
379 logger.Fatalw(ctx, "failed-to-free-up-resources-on-kv-store--system-behavior-has-become-erratic", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
380 }
381 return nil, err
382 }
383
384 logger.Infow(ctx, "resource-instance-added-to-kv-store-successfully",
385 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
386 return tpInstance, nil
387 }
388}
389
390// DeleteTechProfileInstance deletes the TP instance from the local cache as well as deletes the corresponding
391// resource instance from the KV store.
392func (t *TechProfileMgr) DeleteTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string) error {
393 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
394 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
395 logger.Errorw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
396 return fmt.Errorf("uni-port-name-not-confirming-to-format--%s", uniPortName)
397 }
398 path := t.GetTechProfileInstanceKey(ctx, tpID, uniPortName)
399 logger.Infow(ctx, "delete-tp-instance-from-cache", log.Fields{"key": path})
400 t.tpInstanceMapLock.Lock()
401 delete(t.tpInstanceMap, path)
402 t.tpInstanceMapLock.Unlock()
403 if err := t.removeResourceInstanceFromKVStore(ctx, tpID, uniPortName); err != nil {
404 return err
405 }
406 return nil
407}
408
409func (t *TechProfileMgr) GetMulticastTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance) []*tp_pb.TrafficQueue {
410 var encryp bool
411 NumGemPorts := len(tp.DownstreamGemPortAttributeList)
412 mcastTrafficQueues := make([]*tp_pb.TrafficQueue, 0)
413 for Count := 0; Count < NumGemPorts; Count++ {
414 if !isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
415 continue
416 }
417 if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
418 encryp = true
419 } else {
420 encryp = false
421 }
422 mcastTrafficQueues = append(mcastTrafficQueues, &tp_pb.TrafficQueue{
423 Direction: tp_pb.Direction_DOWNSTREAM,
424 GemportId: tp.DownstreamGemPortAttributeList[Count].MulticastGemId,
425 PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
426 AesEncryption: encryp,
427 SchedPolicy: tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy,
428 Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQ,
429 Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
430 DiscardPolicy: tp.DownstreamGemPortAttributeList[Count].DiscardPolicy,
431 })
432 }
433 logger.Debugw(ctx, "Downstream Multicast Traffic queue list ", log.Fields{"queuelist": mcastTrafficQueues})
434 return mcastTrafficQueues
435}
436
437func (t *TechProfileMgr) GetGemportForPbit(ctx context.Context, tp interface{}, dir tp_pb.Direction, pbit uint32) interface{} {
438 /*
439 Function to get the Gemport mapped to a pbit.
440 */
441 switch tp := tp.(type) {
442 case *tp_pb.TechProfileInstance:
443 if dir == tp_pb.Direction_UPSTREAM {
444 // upstream GEM ports
445 numGemPorts := len(tp.UpstreamGemPortAttributeList)
446 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
447 lenOfPbitMap := len(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap)
448 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
449 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
450 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
451 if p, err := strconv.Atoi(string(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
452 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
453 logger.Debugw(ctx, "Found-US-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.UpstreamGemPortAttributeList[gemCnt].GemportId})
454 return tp.UpstreamGemPortAttributeList[gemCnt]
455 }
456 }
457 }
458 }
459 } else if dir == tp_pb.Direction_DOWNSTREAM {
460 //downstream GEM ports
461 numGemPorts := len(tp.DownstreamGemPortAttributeList)
462 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
463 lenOfPbitMap := len(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap)
464 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
465 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
466 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
467 if p, err := strconv.Atoi(string(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
468 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
469 logger.Debugw(ctx, "Found-DS-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.DownstreamGemPortAttributeList[gemCnt].GemportId})
470 return tp.DownstreamGemPortAttributeList[gemCnt]
471 }
472 }
473 }
474 }
475 }
476 logger.Errorw(ctx, "No-GemportId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
khenaidoodc2116e2021-10-19 17:33:19 -0400477 case *tp_pb.EponTechProfileInstance:
khenaidoo106c61a2021-08-11 18:05:46 -0400478 if dir == tp_pb.Direction_UPSTREAM {
479 // upstream GEM ports
480 numGemPorts := len(tp.UpstreamQueueAttributeList)
481 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
482 lenOfPbitMap := len(tp.UpstreamQueueAttributeList[gemCnt].PbitMap)
483 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
484 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
485 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
486 if p, err := strconv.Atoi(string(tp.UpstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
487 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
488 logger.Debugw(ctx, "Found-US-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.UpstreamQueueAttributeList[gemCnt].GemportId})
489 return tp.UpstreamQueueAttributeList[gemCnt]
490 }
491 }
492 }
493 }
494 } else if dir == tp_pb.Direction_DOWNSTREAM {
495 //downstream GEM ports
496 numGemPorts := len(tp.DownstreamQueueAttributeList)
497 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
498 lenOfPbitMap := len(tp.DownstreamQueueAttributeList[gemCnt].PbitMap)
499 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
500 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
501 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
502 if p, err := strconv.Atoi(string(tp.DownstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
503 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
504 logger.Debugw(ctx, "Found-DS-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.DownstreamQueueAttributeList[gemCnt].GemportId})
505 return tp.DownstreamQueueAttributeList[gemCnt]
506 }
507 }
508 }
509 }
510 }
511 logger.Errorw(ctx, "No-QueueId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
512 default:
513 logger.Errorw(ctx, "unknown-tech", log.Fields{"tp": tp})
514 }
515 return nil
516}
517
518// FindAllTpInstances returns all TechProfile instances for a given TechProfile table-id, pon interface ID and onu ID.
519func (t *TechProfileMgr) FindAllTpInstances(ctx context.Context, oltDeviceID string, tpID uint32, intfID uint32, onuID uint32) interface{} {
520 onuTpInstancePathSuffix := fmt.Sprintf("%s/%d/olt-{%s}/pon-{%d}/onu-{%d}", t.resourceMgr.GetTechnology(), tpID, oltDeviceID, intfID, onuID)
521 tech := t.resourceMgr.GetTechnology()
522 if tech == xgspon || tech == xgpon || tech == gpon {
523 t.tpInstanceMapLock.RLock()
524 defer t.tpInstanceMapLock.RUnlock()
525 tpInstancesTech := make([]tp_pb.TechProfileInstance, 0)
526 for i := 0; i < MaxUniPortPerOnu; i++ {
527 key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
528 if tpInst, ok := t.tpInstanceMap[key]; ok {
529 tpInstancesTech = append(tpInstancesTech, *tpInst)
530 }
531 }
532 return tpInstancesTech
533 } else if tech == epon {
534 t.epontpInstanceMapLock.RLock()
535 defer t.epontpInstanceMapLock.RUnlock()
536 tpInstancesTech := make([]tp_pb.EponTechProfileInstance, 0)
537 for i := 0; i < MaxUniPortPerOnu; i++ {
538 key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
539 if tpInst, ok := t.eponTpInstanceMap[key]; ok {
540 tpInstancesTech = append(tpInstancesTech, *tpInst)
541 }
542 }
543 return tpInstancesTech
544 } else {
545 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech, "tpID": tpID, "onuID": onuID, "intfID": intfID})
546 }
547 return nil
548}
549
550func (t *TechProfileMgr) GetResourceID(ctx context.Context, intfID uint32, resourceType string, numIDs uint32) ([]uint32, error) {
551 logger.Debugw(ctx, "getting-resource-id", log.Fields{
552 "intf-id": intfID,
553 "resource-type": resourceType,
554 "num": numIDs,
555 })
556 var err error
557 var ids []uint32
558 switch resourceType {
559 case t.resourceMgr.GetResourceTypeAllocID():
560 t.AllocIDMgmtLock.Lock()
561 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
562 t.AllocIDMgmtLock.Unlock()
563 case t.resourceMgr.GetResourceTypeGemPortID():
564 t.GemPortIDMgmtLock.Lock()
565 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
566 t.GemPortIDMgmtLock.Unlock()
567 case t.resourceMgr.GetResourceTypeOnuID():
568 t.OnuIDMgmtLock.Lock()
569 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
570 t.OnuIDMgmtLock.Unlock()
571 default:
572 return nil, fmt.Errorf("resourceType %s not supported", resourceType)
573 }
574 if err != nil {
575 return nil, err
576 }
577 return ids, nil
578}
579
580func (t *TechProfileMgr) FreeResourceID(ctx context.Context, intfID uint32, resourceType string, ReleaseContent []uint32) error {
581 logger.Debugw(ctx, "freeing-resource-id", log.Fields{
582 "intf-id": intfID,
583 "resource-type": resourceType,
584 "release-content": ReleaseContent,
585 })
586 var err error
587 switch resourceType {
588 case t.resourceMgr.GetResourceTypeAllocID():
589 t.AllocIDMgmtLock.Lock()
590 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
591 t.AllocIDMgmtLock.Unlock()
592 case t.resourceMgr.GetResourceTypeGemPortID():
593 t.GemPortIDMgmtLock.Lock()
594 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
595 t.GemPortIDMgmtLock.Unlock()
596 case t.resourceMgr.GetResourceTypeOnuID():
597 t.OnuIDMgmtLock.Lock()
598 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
599 t.OnuIDMgmtLock.Unlock()
600 default:
601 return fmt.Errorf("resourceType %s not supported", resourceType)
602 }
603 if err != nil {
604 return err
605 }
606 return nil
607}
608
609func (t *TechProfileMgr) GetUsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
610 return &tp_pb.SchedulerConfig{
611 Direction: tpInstance.UsScheduler.Direction,
612 AdditionalBw: tpInstance.UsScheduler.AdditionalBw,
613 Priority: tpInstance.UsScheduler.Priority,
614 Weight: tpInstance.UsScheduler.Weight,
615 SchedPolicy: tpInstance.UsScheduler.QSchedPolicy}
616}
617
618func (t *TechProfileMgr) GetDsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
619 return &tp_pb.SchedulerConfig{
620 Direction: tpInstance.DsScheduler.Direction,
621 AdditionalBw: tpInstance.DsScheduler.AdditionalBw,
622 Priority: tpInstance.DsScheduler.Priority,
623 Weight: tpInstance.DsScheduler.Weight,
624 SchedPolicy: tpInstance.DsScheduler.QSchedPolicy}
625}
626
627func (t *TechProfileMgr) GetTrafficScheduler(tpInstance *tp_pb.TechProfileInstance, SchedCfg *tp_pb.SchedulerConfig,
628 ShapingCfg *tp_pb.TrafficShapingInfo) *tp_pb.TrafficScheduler {
629
630 tSched := &tp_pb.TrafficScheduler{
631 Direction: SchedCfg.Direction,
632 AllocId: tpInstance.UsScheduler.AllocId,
633 TrafficShapingInfo: ShapingCfg,
634 Scheduler: SchedCfg}
635
636 return tSched
637}
638
639func (t *TechProfileMgr) GetTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance, direction tp_pb.Direction) ([]*tp_pb.TrafficQueue, error) {
640
641 var encryp bool
642 if direction == tp_pb.Direction_UPSTREAM {
643 // upstream GEM ports
644 NumGemPorts := len(tp.UpstreamGemPortAttributeList)
645 GemPorts := make([]*tp_pb.TrafficQueue, 0)
646 for Count := 0; Count < NumGemPorts; Count++ {
647 if tp.UpstreamGemPortAttributeList[Count].AesEncryption == "True" {
648 encryp = true
649 } else {
650 encryp = false
651 }
652
653 GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
654 Direction: direction,
655 GemportId: tp.UpstreamGemPortAttributeList[Count].GemportId,
656 PbitMap: tp.UpstreamGemPortAttributeList[Count].PbitMap,
657 AesEncryption: encryp,
658 SchedPolicy: tp.UpstreamGemPortAttributeList[Count].SchedulingPolicy,
659 Priority: tp.UpstreamGemPortAttributeList[Count].PriorityQ,
660 Weight: tp.UpstreamGemPortAttributeList[Count].Weight,
661 DiscardPolicy: tp.UpstreamGemPortAttributeList[Count].DiscardPolicy,
662 })
663 }
664 logger.Debugw(ctx, "Upstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
665 return GemPorts, nil
666 } else if direction == tp_pb.Direction_DOWNSTREAM {
667 //downstream GEM ports
668 NumGemPorts := len(tp.DownstreamGemPortAttributeList)
669 GemPorts := make([]*tp_pb.TrafficQueue, 0)
670 for Count := 0; Count < NumGemPorts; Count++ {
671 if isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
672 //do not take multicast GEM ports. They are handled separately.
673 continue
674 }
675 if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
676 encryp = true
677 } else {
678 encryp = false
679 }
680
681 GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
682 Direction: direction,
683 GemportId: tp.DownstreamGemPortAttributeList[Count].GemportId,
684 PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
685 AesEncryption: encryp,
686 SchedPolicy: tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy,
687 Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQ,
688 Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
689 DiscardPolicy: tp.DownstreamGemPortAttributeList[Count].DiscardPolicy,
690 })
691 }
692 logger.Debugw(ctx, "Downstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
693 return GemPorts, nil
694 }
695
696 logger.Errorf(ctx, "Unsupported direction %s used for generating Traffic Queue list", direction)
697 return nil, fmt.Errorf("downstream gem port traffic queue creation failed due to unsupported direction %s", direction)
698}
699
700func (t *TechProfileMgr) validateInstanceControlAttr(ctx context.Context, instCtl tp_pb.InstanceControl) error {
701 if instCtl.Onu != "single-instance" && instCtl.Onu != "multi-instance" {
702 logger.Errorw(ctx, "invalid-onu-instance-control-attribute", log.Fields{"onu-inst": instCtl.Onu})
703 return errors.New("invalid-onu-instance-ctl-attr")
704 }
705
706 if instCtl.Uni != "single-instance" && instCtl.Uni != "multi-instance" {
707 logger.Errorw(ctx, "invalid-uni-instance-control-attribute", log.Fields{"uni-inst": instCtl.Uni})
708 return errors.New("invalid-uni-instance-ctl-attr")
709 }
710
711 if instCtl.Uni == "multi-instance" {
712 logger.Error(ctx, "uni-multi-instance-tp-not-supported")
713 return errors.New("uni-multi-instance-tp-not-supported")
714 }
715
716 return nil
717}
718
719// allocateTPInstance for GPON, XGPON and XGS-PON technology
720func (t *TechProfileMgr) allocateTPInstance(ctx context.Context, uniPortName string, tp *tp_pb.TechProfile, intfID uint32, tpInstPathSuffix string) *tp_pb.TechProfileInstance {
721
722 var usGemPortAttributeList []*tp_pb.GemPortAttributes
723 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
724 var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
725 var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
726 var tcontIDs []uint32
727 var gemPorts []uint32
728 var err error
729
730 logger.Infow(ctx, "Allocating TechProfileMgr instance from techprofile template", log.Fields{"uniPortName": uniPortName, "intfID": intfID, "numGem": tp.NumGemPorts})
731
732 if tp.InstanceControl.Onu == "multi-instance" {
733 tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1)
734 if err != nil {
735 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
736 return nil
737 }
738 } else { // "single-instance"
739 if tpInst := t.getSingleInstanceTp(ctx, tpInstPathSuffix); tpInst == nil {
740 // No "single-instance" tp found on one any uni port for the given TP ID
741 // Allocate a new TcontID or AllocID
742 tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1)
743 if err != nil {
744 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
745 return nil
746 }
747 } else {
748 // Use the alloc-id from the existing TpInstance
749 tcontIDs = append(tcontIDs, tpInst.UsScheduler.AllocId)
750 }
751 }
752 logger.Debugw(ctx, "Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
753 gemPorts, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts)
754 if err != nil {
755 logger.Errorw(ctx, "Error getting gemport ids from rsrcrMgr", log.Fields{"err": err, "intfID": intfID, "numGemports": tp.NumGemPorts})
756 return nil
757 }
758 logger.Infow(ctx, "Allocated tconts and GEM ports successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
759 for index := 0; index < int(tp.NumGemPorts); index++ {
760 usGemPortAttributeList = append(usGemPortAttributeList,
761 &tp_pb.GemPortAttributes{GemportId: gemPorts[index],
762 MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
763 PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
764 AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
765 SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
766 PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
767 Weight: tp.UpstreamGemPortAttributeList[index].Weight,
768 DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
769 DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
770 }
771
772 logger.Info(ctx, "length of DownstreamGemPortAttributeList", len(tp.DownstreamGemPortAttributeList))
773 //put multicast and unicast downstream GEM port attributes in different lists first
774 for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
775 if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
776 dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
777 &tp_pb.GemPortAttributes{
778 MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
779 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
780 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
781 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
782 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
783 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
784 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
785 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
786 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
787 IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
788 DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
789 StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
790 } else {
791 dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
792 &tp_pb.GemPortAttributes{
793 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
794 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
795 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
796 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
797 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
798 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
799 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
800 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
801 }
802 }
803 //add unicast downstream GEM ports to dsGemPortAttributeList
804 if dsUnicastGemAttributeList != nil {
805 for index := 0; index < int(tp.NumGemPorts); index++ {
806 dsGemPortAttributeList = append(dsGemPortAttributeList,
807 &tp_pb.GemPortAttributes{GemportId: gemPorts[index],
808 MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
809 PbitMap: dsUnicastGemAttributeList[index].PbitMap,
810 AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
811 SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
812 PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
813 Weight: dsUnicastGemAttributeList[index].Weight,
814 DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
815 DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
816 }
817 }
818 //add multicast GEM ports to dsGemPortAttributeList afterwards
819 for k := range dsMulticastGemAttributeList {
820 dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
821 }
822
823 return &tp_pb.TechProfileInstance{
824 SubscriberIdentifier: uniPortName,
825 Name: tp.Name,
826 ProfileType: tp.ProfileType,
827 Version: tp.Version,
828 NumGemPorts: tp.NumGemPorts,
829 InstanceControl: tp.InstanceControl,
830 UsScheduler: &tp_pb.SchedulerAttributes{
831 AllocId: tcontIDs[0],
832 Direction: tp.UsScheduler.Direction,
833 AdditionalBw: tp.UsScheduler.AdditionalBw,
834 Priority: tp.UsScheduler.Priority,
835 Weight: tp.UsScheduler.Weight,
836 QSchedPolicy: tp.UsScheduler.QSchedPolicy},
837 DsScheduler: &tp_pb.SchedulerAttributes{
838 AllocId: tcontIDs[0],
839 Direction: tp.DsScheduler.Direction,
840 AdditionalBw: tp.DsScheduler.AdditionalBw,
841 Priority: tp.DsScheduler.Priority,
842 Weight: tp.DsScheduler.Weight,
843 QSchedPolicy: tp.DsScheduler.QSchedPolicy},
844 UpstreamGemPortAttributeList: usGemPortAttributeList,
845 DownstreamGemPortAttributeList: dsGemPortAttributeList}
846}
847
848// allocateTPInstance function for EPON
849func (t *TechProfileMgr) allocateEponTPInstance(ctx context.Context, uniPortName string, tp *tp_pb.EponTechProfile, intfID uint32, tpInstPath string) *tp_pb.EponTechProfileInstance {
850
851 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
852 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
853 var tcontIDs []uint32
854 var gemPorts []uint32
855 var err error
856
857 logger.Infow(ctx, "allocating-tp-instance-from-tp-template", log.Fields{"uniPortName": uniPortName, "intfID": intfID, "numGem": tp.NumGemPorts})
858
859 if tp.InstanceControl.Onu == "multi-instance" {
860 if tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
861 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
862 return nil
863 }
864 } else { // "single-instance"
865 if tpInst := t.getSingleInstanceEponTp(ctx, tpInstPath); tpInst == nil {
866 // No "single-instance" tp found on one any uni port for the given TP ID
867 // Allocate a new TcontID or AllocID
868 if tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
869 logger.Errorw(ctx, "error-getting-alloc-id-from-resource-mgr", log.Fields{"err": err, "intfID": intfID})
870 return nil
871 }
872 } else {
873 // Use the alloc-id from the existing TpInstance
874 tcontIDs = append(tcontIDs, tpInst.AllocId)
875 }
876 }
877 logger.Debugw(ctx, "Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
878 if gemPorts, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts); err != nil {
879 logger.Errorw(ctx, "error-getting-gemport-id-from-resource-mgr", log.Fields{"err": err, "intfID": intfID, "numGemports": tp.NumGemPorts})
880 return nil
881 }
882 logger.Infow(ctx, "allocated-alloc-id-and-gemport-successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
883 for index := 0; index < int(tp.NumGemPorts); index++ {
884 usQueueAttributeList = append(usQueueAttributeList,
885 &tp_pb.EPONQueueAttributes{GemportId: gemPorts[index],
886 MaxQSize: tp.UpstreamQueueAttributeList[index].MaxQSize,
887 PbitMap: tp.UpstreamQueueAttributeList[index].PbitMap,
888 AesEncryption: tp.UpstreamQueueAttributeList[index].AesEncryption,
889 TrafficType: tp.UpstreamQueueAttributeList[index].TrafficType,
890 UnsolicitedGrantSize: tp.UpstreamQueueAttributeList[index].UnsolicitedGrantSize,
891 NominalInterval: tp.UpstreamQueueAttributeList[index].NominalInterval,
892 ToleratedPollJitter: tp.UpstreamQueueAttributeList[index].ToleratedPollJitter,
893 RequestTransmissionPolicy: tp.UpstreamQueueAttributeList[index].RequestTransmissionPolicy,
894 NumQSets: tp.UpstreamQueueAttributeList[index].NumQSets,
895 QThresholds: tp.UpstreamQueueAttributeList[index].QThresholds,
896 SchedulingPolicy: tp.UpstreamQueueAttributeList[index].SchedulingPolicy,
897 PriorityQ: tp.UpstreamQueueAttributeList[index].PriorityQ,
898 Weight: tp.UpstreamQueueAttributeList[index].Weight,
899 DiscardPolicy: tp.UpstreamQueueAttributeList[index].DiscardPolicy,
900 DiscardConfig: tp.UpstreamQueueAttributeList[index].DiscardConfig})
901 }
902
903 logger.Info(ctx, "length-of-downstream-gemport-attribute-list", len(tp.DownstreamQueueAttributeList))
904 for index := 0; index < int(tp.NumGemPorts); index++ {
905 dsQueueAttributeList = append(dsQueueAttributeList,
906 &tp_pb.EPONQueueAttributes{GemportId: gemPorts[index],
907 MaxQSize: tp.DownstreamQueueAttributeList[index].MaxQSize,
908 PbitMap: tp.DownstreamQueueAttributeList[index].PbitMap,
909 AesEncryption: tp.DownstreamQueueAttributeList[index].AesEncryption,
910 SchedulingPolicy: tp.DownstreamQueueAttributeList[index].SchedulingPolicy,
911 PriorityQ: tp.DownstreamQueueAttributeList[index].PriorityQ,
912 Weight: tp.DownstreamQueueAttributeList[index].Weight,
913 DiscardPolicy: tp.DownstreamQueueAttributeList[index].DiscardPolicy,
914 DiscardConfig: tp.DownstreamQueueAttributeList[index].DiscardConfig})
915 }
916
917 return &tp_pb.EponTechProfileInstance{
918 SubscriberIdentifier: uniPortName,
919 Name: tp.Name,
920 ProfileType: tp.ProfileType,
921 Version: tp.Version,
922 NumGemPorts: tp.NumGemPorts,
923 InstanceControl: tp.InstanceControl,
924 PackageType: tp.PackageType,
925 AllocId: tcontIDs[0],
926 UpstreamQueueAttributeList: usQueueAttributeList,
927 DownstreamQueueAttributeList: dsQueueAttributeList}
928}
929
930// getSingleInstanceTp returns another TpInstance (GPON, XGPON, XGS-PON) for an ONU on a different
931// uni port for the same TP ID, if it finds one, else nil.
932func (t *TechProfileMgr) getSingleInstanceTp(ctx context.Context, tpPathSuffix string) *tp_pb.TechProfileInstance {
933
934 // For example:
935 // tpPathSuffix like "XGS-PON/64/olt-{1234}/pon-{0}/onu-{1}/uni-{1}"
936 // is broken into ["XGS-PON/64/olt-{1234}/pon-{0}/onu-{1}" ""]
937 uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPathSuffix, 2)
938
939 t.tpInstanceMapLock.RLock()
940 defer t.tpInstanceMapLock.RUnlock()
941 for i := 0; i < MaxUniPortPerOnu; i++ {
942 key := fmt.Sprintf(uniPathSlice[0]+"/uni-{%d}", i)
943 if tpInst, ok := t.tpInstanceMap[key]; ok {
944 logger.Debugw(ctx, "found-single-instance-tp", log.Fields{"key": key})
945 return tpInst
946 }
947 }
948 return nil
949}
950
951// getSingleInstanceTp returns another TpInstance (EPON) for an ONU on a different
952// uni port for the same TP ID, if it finds one, else nil.
953func (t *TechProfileMgr) getSingleInstanceEponTp(ctx context.Context, tpPathSuffix string) *tp_pb.EponTechProfileInstance {
954 // For example:
955 // tpPathSuffix like "EPON/64/olt-{1234}/pon-{0}/onu-{1}/uni-{1}"
956 // is broken into ["EPON/64/-{1234}/pon-{0}/onu-{1}" ""]
957 uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPathSuffix, 2)
958
959 t.epontpInstanceMapLock.RLock()
960 defer t.epontpInstanceMapLock.RUnlock()
961 for i := 0; i < MaxUniPortPerOnu; i++ {
962 key := fmt.Sprintf(uniPathSlice[0]+"/uni-{%d}", i)
963 if tpInst, ok := t.eponTpInstanceMap[key]; ok {
964 logger.Debugw(ctx, "found-single-instance-tp", log.Fields{"key": key})
965 return tpInst
966 }
967 }
968 return nil
969}
970
971// getDefaultTechProfile returns a default TechProfile for GPON, XGPON, XGS-PON
972func (t *TechProfileMgr) getDefaultTechProfile(ctx context.Context) *tp_pb.TechProfile {
973 var usGemPortAttributeList []*tp_pb.GemPortAttributes
974 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
975
976 for _, pbit := range t.config.DefaultPbits {
977 logger.Debugw(ctx, "creating-gem-port-profile-profile", log.Fields{"pbit": pbit})
978 usGemPortAttributeList = append(usGemPortAttributeList,
979 &tp_pb.GemPortAttributes{
980 MaxQSize: defaultMaxQueueSize,
981 PbitMap: pbit,
982 AesEncryption: defaultAESEncryption,
983 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
984 PriorityQ: defaultPriorityQueue,
985 Weight: defaultQueueWeight,
986 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
987 DiscardConfigV2: &tp_pb.DiscardConfig{
988 DiscardPolicy: tp_pb.DiscardPolicy_Red,
989 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
990 RedDiscardConfig: &tp_pb.RedDiscardConfig{
991 MinThreshold: defaultMinThreshold,
992 MaxThreshold: defaultMaxThreshold,
993 MaxProbability: defaultMaxProbability,
994 },
995 },
996 },
997 DiscardConfig: &tp_pb.RedDiscardConfig{
998 MinThreshold: defaultMinThreshold,
999 MaxThreshold: defaultMaxThreshold,
1000 MaxProbability: defaultMaxProbability,
1001 },
1002 })
1003 dsGemPortAttributeList = append(dsGemPortAttributeList,
1004 &tp_pb.GemPortAttributes{
1005 MaxQSize: defaultMaxQueueSize,
1006 PbitMap: pbit,
1007 AesEncryption: defaultAESEncryption,
1008 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
1009 PriorityQ: defaultPriorityQueue,
1010 Weight: defaultQueueWeight,
1011 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
1012 DiscardConfigV2: &tp_pb.DiscardConfig{
1013 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1014 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1015 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1016 MinThreshold: defaultMinThreshold,
1017 MaxThreshold: defaultMaxThreshold,
1018 MaxProbability: defaultMaxProbability,
1019 },
1020 },
1021 },
1022 DiscardConfig: &tp_pb.RedDiscardConfig{
1023 MinThreshold: defaultMinThreshold,
1024 MaxThreshold: defaultMaxThreshold,
1025 MaxProbability: defaultMaxProbability,
1026 },
1027 IsMulticast: defaultIsMulticast,
1028 DynamicAccessControlList: defaultAccessControlList,
1029 StaticAccessControlList: defaultAccessControlList,
1030 MulticastGemId: defaultMcastGemID})
1031 }
1032 return &tp_pb.TechProfile{
1033 Name: t.config.DefaultTPName,
1034 ProfileType: t.resourceMgr.GetTechnology(),
1035 Version: t.config.TPVersion,
1036 NumGemPorts: uint32(len(usGemPortAttributeList)),
1037 InstanceControl: &tp_pb.InstanceControl{
1038 Onu: defaultOnuInstance,
1039 Uni: defaultUniInstance,
1040 MaxGemPayloadSize: defaultGemPayloadSize},
1041 UsScheduler: &tp_pb.SchedulerAttributes{
1042 Direction: tp_pb.Direction_UPSTREAM,
1043 AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
1044 Priority: defaultPriority,
1045 Weight: defaultWeight,
1046 QSchedPolicy: tp_pb.SchedulingPolicy_Hybrid},
1047 DsScheduler: &tp_pb.SchedulerAttributes{
1048 Direction: tp_pb.Direction_DOWNSTREAM,
1049 AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
1050 Priority: defaultPriority,
1051 Weight: defaultWeight,
1052 QSchedPolicy: tp_pb.SchedulingPolicy_Hybrid},
1053 UpstreamGemPortAttributeList: usGemPortAttributeList,
1054 DownstreamGemPortAttributeList: dsGemPortAttributeList}
1055}
1056
1057// getDefaultEponProfile returns a default TechProfile for EPON
1058func (t *TechProfileMgr) getDefaultEponProfile(ctx context.Context) *tp_pb.EponTechProfile {
1059
1060 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
1061 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
1062
1063 for _, pbit := range t.config.DefaultPbits {
1064 logger.Debugw(ctx, "Creating Queue", log.Fields{"pbit": pbit})
1065 usQueueAttributeList = append(usQueueAttributeList,
1066 &tp_pb.EPONQueueAttributes{
1067 MaxQSize: defaultMaxQueueSize,
1068 PbitMap: pbit,
1069 AesEncryption: defaultAESEncryption,
1070 TrafficType: defaultTrafficType,
1071 UnsolicitedGrantSize: defaultUnsolicitedGrantSize,
1072 NominalInterval: defaultNominalInterval,
1073 ToleratedPollJitter: defaultToleratedPollJitter,
1074 RequestTransmissionPolicy: defaultRequestTransmissionPolicy,
1075 NumQSets: defaultNumQueueSet,
1076 QThresholds: &tp_pb.QThresholds{
1077 QThreshold1: defaultQThreshold1,
1078 QThreshold2: defaultQThreshold2,
1079 QThreshold3: defaultQThreshold3,
1080 QThreshold4: defaultQThreshold4,
1081 QThreshold5: defaultQThreshold5,
1082 QThreshold6: defaultQThreshold6,
1083 QThreshold7: defaultQThreshold7},
1084 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
1085 PriorityQ: defaultPriorityQueue,
1086 Weight: defaultQueueWeight,
1087 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
1088 DiscardConfigV2: &tp_pb.DiscardConfig{
1089 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1090 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1091 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1092 MinThreshold: defaultMinThreshold,
1093 MaxThreshold: defaultMaxThreshold,
1094 MaxProbability: defaultMaxProbability,
1095 },
1096 },
1097 },
1098 DiscardConfig: &tp_pb.RedDiscardConfig{
1099 MinThreshold: defaultMinThreshold,
1100 MaxThreshold: defaultMaxThreshold,
1101 MaxProbability: defaultMaxProbability,
1102 }})
1103 dsQueueAttributeList = append(dsQueueAttributeList,
1104 &tp_pb.EPONQueueAttributes{
1105 MaxQSize: defaultMaxQueueSize,
1106 PbitMap: pbit,
1107 AesEncryption: defaultAESEncryption,
1108 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
1109 PriorityQ: defaultPriorityQueue,
1110 Weight: defaultQueueWeight,
1111 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
1112 DiscardConfigV2: &tp_pb.DiscardConfig{
1113 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1114 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1115 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1116 MinThreshold: defaultMinThreshold,
1117 MaxThreshold: defaultMaxThreshold,
1118 MaxProbability: defaultMaxProbability,
1119 },
1120 },
1121 },
1122 DiscardConfig: &tp_pb.RedDiscardConfig{
1123 MinThreshold: defaultMinThreshold,
1124 MaxThreshold: defaultMaxThreshold,
1125 MaxProbability: defaultMaxProbability,
1126 }})
1127 }
1128 return &tp_pb.EponTechProfile{
1129 Name: t.config.DefaultTPName,
1130 ProfileType: t.resourceMgr.GetTechnology(),
1131 Version: t.config.TPVersion,
1132 NumGemPorts: uint32(len(usQueueAttributeList)),
1133 InstanceControl: &tp_pb.InstanceControl{
1134 Onu: defaultOnuInstance,
1135 Uni: defaultUniInstance,
1136 MaxGemPayloadSize: defaultGemPayloadSize},
1137 PackageType: defaultPakageType,
1138 UpstreamQueueAttributeList: usQueueAttributeList,
1139 DownstreamQueueAttributeList: dsQueueAttributeList}
1140}
1141
1142//isMulticastGem returns true if isMulticast attribute value of a GEM port is true; false otherwise
1143func isMulticastGem(isMulticastAttrValue string) bool {
1144 return isMulticastAttrValue != "" &&
1145 (isMulticastAttrValue == "True" || isMulticastAttrValue == "true" || isMulticastAttrValue == "TRUE")
1146}
1147
1148func (t *TechProfileMgr) addResourceInstanceToKVStore(ctx context.Context, tpID uint32, uniPortName string, resInst tp_pb.ResourceInstance) error {
1149 logger.Debugw(ctx, "adding-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
1150 val, err := proto.Marshal(&resInst)
1151 if err != nil {
1152 logger.Errorw(ctx, "failed-to-marshall-resource-instance", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
1153 return err
1154 }
1155 err = t.config.ResourceInstanceKVBacked.Put(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName), val)
1156 return err
1157}
1158
1159func (t *TechProfileMgr) removeResourceInstanceFromKVStore(ctx context.Context, tpID uint32, uniPortName string) error {
1160 logger.Debugw(ctx, "removing-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName})
1161 if err := t.config.ResourceInstanceKVBacked.Delete(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName)); err != nil {
1162 logger.Errorw(ctx, "error-removing-resource-instance-to-kv-store", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
1163 return err
1164 }
1165 return nil
1166}
1167
1168func (t *TechProfileMgr) getTPFromKVStore(ctx context.Context, tpID uint32) *tp_pb.TechProfile {
1169 var tp *tp_pb.TechProfile
1170 t.tpMapLock.RLock()
1171 tp, ok := t.tpMap[tpID]
1172 t.tpMapLock.RUnlock()
1173 if ok {
1174 logger.Debugw(ctx, "found-tp-in-cache", log.Fields{"tpID": tpID})
1175 return tp
1176 }
1177 key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), tpID)
1178 logger.Debugw(ctx, "getting-tp-from-kv-store", log.Fields{"tpID": tpID, "Key": key})
1179 kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
1180 if err != nil {
1181 logger.Errorw(ctx, "error-fetching-from-kv-store", log.Fields{"err": err, "key": key})
1182 return nil
1183 }
1184 if kvresult != nil {
1185 /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
1186 if value, err := kvstore.ToByte(kvresult.Value); err == nil {
1187 lTp := &tp_pb.TechProfile{}
1188 reader := bytes.NewReader(value)
1189 if err = jsonpb.Unmarshal(reader, lTp); err != nil {
1190 logger.Errorw(ctx, "error-unmarshalling-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
1191 return nil
1192 }
1193
1194 logger.Debugw(ctx, "success-fetched-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lTp})
1195 return lTp
1196 } else {
1197 logger.Errorw(ctx, "error-decoding-tp", log.Fields{"err": err, "tpID": tpID})
1198 // We we create a default profile in this case.
1199 }
1200 }
1201
1202 return nil
1203}
1204
1205func (t *TechProfileMgr) getEponTPFromKVStore(ctx context.Context, tpID uint32) *tp_pb.EponTechProfile {
1206 var eponTp *tp_pb.EponTechProfile
1207 t.eponTpMapLock.RLock()
1208 eponTp, ok := t.eponTpMap[tpID]
1209 t.eponTpMapLock.RUnlock()
1210 if ok {
1211 logger.Debugw(ctx, "found-tp-in-cache", log.Fields{"tpID": tpID})
1212 return eponTp
1213 }
1214 key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), tpID)
1215 logger.Debugw(ctx, "getting-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "Key": key})
1216 kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
1217 if err != nil {
1218 logger.Errorw(ctx, "error-fetching-from-kv-store", log.Fields{"err": err, "key": key})
1219 return nil
1220 }
1221 if kvresult != nil {
1222 /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
1223 if value, err := kvstore.ToByte(kvresult.Value); err == nil {
1224 lEponTp := &tp_pb.EponTechProfile{}
1225 reader := bytes.NewReader(value)
1226 if err = jsonpb.Unmarshal(reader, lEponTp); err != nil {
1227 logger.Errorw(ctx, "error-unmarshalling-epon-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
1228 return nil
1229 }
1230
1231 logger.Debugw(ctx, "success-fetching-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lEponTp})
1232 return lEponTp
1233 }
1234 }
1235 return nil
1236}
1237
1238func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
1239
1240 logger.Infow(ctx, "kv-store", log.Fields{"storeType": storeType, "address": address})
1241 switch storeType {
1242 case "etcd":
1243 return kvstore.NewEtcdClient(ctx, address, timeout, log.WarnLevel)
Joey Armstronga6af1522023-01-17 16:06:16 -05001244 case "redis":
1245 return kvstore.NewRedisClient(address, timeout, false)
1246 case "redis-sentinel":
1247 return kvstore.NewRedisClient(address, timeout, true)
khenaidoo106c61a2021-08-11 18:05:46 -04001248 }
Joey Armstronga6af1522023-01-17 16:06:16 -05001249
khenaidoo106c61a2021-08-11 18:05:46 -04001250 return nil, errors.New("unsupported-kv-store")
1251}
1252
1253// buildTpInstanceFromResourceInstance for GPON, XGPON and XGS-PON technology - build TpInstance from TechProfile template and ResourceInstance
1254func (t *TechProfileMgr) buildTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.TechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
1255
1256 var usGemPortAttributeList []*tp_pb.GemPortAttributes
1257 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
1258 var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
1259 var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
1260
1261 if len(resInst.GemportIds) != int(tp.NumGemPorts) {
1262 logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-template-and-resource-instance",
1263 log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
1264 return nil
1265 }
1266 for index := 0; index < int(tp.NumGemPorts); index++ {
1267 usGemPortAttributeList = append(usGemPortAttributeList,
1268 &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
1269 MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
1270 PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
1271 AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
1272 SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
1273 PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
1274 Weight: tp.UpstreamGemPortAttributeList[index].Weight,
1275 DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
1276 DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
1277 }
1278
1279 //put multicast and unicast downstream GEM port attributes in different lists first
1280 for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
1281 if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
1282 dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
1283 &tp_pb.GemPortAttributes{
1284 MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
1285 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
1286 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
1287 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
1288 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
1289 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
1290 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
1291 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
1292 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
1293 IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
1294 DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
1295 StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
1296 } else {
1297 dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
1298 &tp_pb.GemPortAttributes{
1299 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
1300 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
1301 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
1302 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
1303 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
1304 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
1305 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
1306 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
1307 }
1308 }
1309 //add unicast downstream GEM ports to dsGemPortAttributeList
1310 if dsUnicastGemAttributeList != nil {
1311 for index := 0; index < int(tp.NumGemPorts); index++ {
1312 dsGemPortAttributeList = append(dsGemPortAttributeList,
1313 &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
1314 MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
1315 PbitMap: dsUnicastGemAttributeList[index].PbitMap,
1316 AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
1317 SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
1318 PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
1319 Weight: dsUnicastGemAttributeList[index].Weight,
1320 DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
1321 DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
1322 }
1323 }
1324 //add multicast GEM ports to dsGemPortAttributeList afterwards
1325 for k := range dsMulticastGemAttributeList {
1326 dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
1327 }
1328
1329 return &tp_pb.TechProfileInstance{
1330 SubscriberIdentifier: resInst.SubscriberIdentifier,
1331 Name: tp.Name,
1332 ProfileType: tp.ProfileType,
1333 Version: tp.Version,
1334 NumGemPorts: tp.NumGemPorts,
1335 InstanceControl: tp.InstanceControl,
1336 UsScheduler: &tp_pb.SchedulerAttributes{
1337 AllocId: resInst.AllocId,
1338 Direction: tp.UsScheduler.Direction,
1339 AdditionalBw: tp.UsScheduler.AdditionalBw,
1340 Priority: tp.UsScheduler.Priority,
1341 Weight: tp.UsScheduler.Weight,
1342 QSchedPolicy: tp.UsScheduler.QSchedPolicy},
1343 DsScheduler: &tp_pb.SchedulerAttributes{
1344 AllocId: resInst.AllocId,
1345 Direction: tp.DsScheduler.Direction,
1346 AdditionalBw: tp.DsScheduler.AdditionalBw,
1347 Priority: tp.DsScheduler.Priority,
1348 Weight: tp.DsScheduler.Weight,
1349 QSchedPolicy: tp.DsScheduler.QSchedPolicy},
1350 UpstreamGemPortAttributeList: usGemPortAttributeList,
1351 DownstreamGemPortAttributeList: dsGemPortAttributeList}
1352}
1353
1354// buildEponTpInstanceFromResourceInstance for EPON technology - build EponTpInstance from EponTechProfile template and ResourceInstance
1355func (t *TechProfileMgr) buildEponTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.EponTechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.EponTechProfileInstance {
1356
1357 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
1358 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
1359
1360 if len(resInst.GemportIds) != int(tp.NumGemPorts) {
1361 logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-epon-tp-template-and-resource-instance",
1362 log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
1363 return nil
1364 }
1365
1366 for index := 0; index < int(tp.NumGemPorts); index++ {
1367 usQueueAttributeList = append(usQueueAttributeList,
1368 &tp_pb.EPONQueueAttributes{GemportId: resInst.GemportIds[index],
1369 MaxQSize: tp.UpstreamQueueAttributeList[index].MaxQSize,
1370 PbitMap: tp.UpstreamQueueAttributeList[index].PbitMap,
1371 AesEncryption: tp.UpstreamQueueAttributeList[index].AesEncryption,
1372 TrafficType: tp.UpstreamQueueAttributeList[index].TrafficType,
1373 UnsolicitedGrantSize: tp.UpstreamQueueAttributeList[index].UnsolicitedGrantSize,
1374 NominalInterval: tp.UpstreamQueueAttributeList[index].NominalInterval,
1375 ToleratedPollJitter: tp.UpstreamQueueAttributeList[index].ToleratedPollJitter,
1376 RequestTransmissionPolicy: tp.UpstreamQueueAttributeList[index].RequestTransmissionPolicy,
1377 NumQSets: tp.UpstreamQueueAttributeList[index].NumQSets,
1378 QThresholds: tp.UpstreamQueueAttributeList[index].QThresholds,
1379 SchedulingPolicy: tp.UpstreamQueueAttributeList[index].SchedulingPolicy,
1380 PriorityQ: tp.UpstreamQueueAttributeList[index].PriorityQ,
1381 Weight: tp.UpstreamQueueAttributeList[index].Weight,
1382 DiscardPolicy: tp.UpstreamQueueAttributeList[index].DiscardPolicy,
1383 DiscardConfig: tp.UpstreamQueueAttributeList[index].DiscardConfig})
1384 }
1385
1386 for index := 0; index < int(tp.NumGemPorts); index++ {
1387 dsQueueAttributeList = append(dsQueueAttributeList,
1388 &tp_pb.EPONQueueAttributes{GemportId: resInst.GemportIds[index],
1389 MaxQSize: tp.DownstreamQueueAttributeList[index].MaxQSize,
1390 PbitMap: tp.DownstreamQueueAttributeList[index].PbitMap,
1391 AesEncryption: tp.DownstreamQueueAttributeList[index].AesEncryption,
1392 SchedulingPolicy: tp.DownstreamQueueAttributeList[index].SchedulingPolicy,
1393 PriorityQ: tp.DownstreamQueueAttributeList[index].PriorityQ,
1394 Weight: tp.DownstreamQueueAttributeList[index].Weight,
1395 DiscardPolicy: tp.DownstreamQueueAttributeList[index].DiscardPolicy,
1396 DiscardConfig: tp.DownstreamQueueAttributeList[index].DiscardConfig})
1397 }
1398
1399 return &tp_pb.EponTechProfileInstance{
1400 SubscriberIdentifier: resInst.SubscriberIdentifier,
1401 Name: tp.Name,
1402 ProfileType: tp.ProfileType,
1403 Version: tp.Version,
1404 NumGemPorts: tp.NumGemPorts,
1405 InstanceControl: tp.InstanceControl,
1406 PackageType: tp.PackageType,
1407 AllocId: resInst.AllocId,
1408 UpstreamQueueAttributeList: usQueueAttributeList,
1409 DownstreamQueueAttributeList: dsQueueAttributeList}
1410}
1411
1412func (t *TechProfileMgr) getTpInstanceFromResourceInstance(ctx context.Context, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
1413 if resInst == nil {
1414 logger.Error(ctx, "resource-instance-nil")
1415 return nil
1416 }
1417 tp := t.getTPFromKVStore(ctx, resInst.TpId)
1418 if tp == nil {
1419 logger.Warnw(ctx, "tp-not-found-on-kv--creating-default-tp", log.Fields{"tpID": resInst.TpId})
1420 tp = t.getDefaultTechProfile(ctx)
1421 }
1422 return t.buildTpInstanceFromResourceInstance(ctx, tp, resInst)
1423}
1424
1425func (t *TechProfileMgr) getEponTpInstanceFromResourceInstance(ctx context.Context, resInst *tp_pb.ResourceInstance) *tp_pb.EponTechProfileInstance {
1426 if resInst == nil {
1427 logger.Error(ctx, "resource-instance-nil")
1428 return nil
1429 }
1430 eponTp := t.getEponTPFromKVStore(ctx, resInst.TpId)
1431 if eponTp == nil {
1432 logger.Warnw(ctx, "tp-not-found-on-kv--creating-default-tp", log.Fields{"tpID": resInst.TpId})
1433 eponTp = t.getDefaultEponProfile(ctx)
1434 }
1435 return t.buildEponTpInstanceFromResourceInstance(ctx, eponTp, resInst)
1436}
1437
1438func (t *TechProfileMgr) reconcileTpInstancesToCache(ctx context.Context) error {
1439
1440 tech := t.resourceMgr.GetTechnology()
1441 newCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
1442 defer cancel()
1443 kvPairs, _ := t.config.ResourceInstanceKVBacked.List(newCtx, tech)
1444
1445 if tech == xgspon || tech == xgpon || tech == gpon {
1446 for keyPath, kvPair := range kvPairs {
1447 logger.Debugw(ctx, "attempting-to-reconcile-tp-instance-from-resource-instance", log.Fields{"resourceInstPath": keyPath})
1448 if value, err := kvstore.ToByte(kvPair.Value); err == nil {
1449 var resInst tp_pb.ResourceInstance
1450 if err = proto.Unmarshal(value, &resInst); err != nil {
1451 logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"err": err, "keyPath": keyPath, "value": value})
1452 continue
1453 } else {
1454 if tpInst := t.getTpInstanceFromResourceInstance(ctx, &resInst); tpInst != nil {
1455 // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
1456 keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
1457 if len(keySuffixSlice) == 2 {
1458 keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
1459 // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
1460 if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
1461 logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
1462 continue
1463 }
1464 } else {
1465 logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
1466 continue
1467 }
1468 t.tpInstanceMapLock.Lock()
1469 t.tpInstanceMap[keySuffixSlice[1]] = tpInst
1470 t.tpInstanceMapLock.Unlock()
1471 logger.Debugw(ctx, "reconciled-tp-success", log.Fields{"keyPath": keyPath})
1472 }
1473 }
1474 } else {
1475 logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
1476 }
1477 }
1478 } else if tech == epon {
1479 for keyPath, kvPair := range kvPairs {
1480 logger.Debugw(ctx, "attempting-to-reconcile-epon-tp-instance", log.Fields{"keyPath": keyPath})
1481 if value, err := kvstore.ToByte(kvPair.Value); err == nil {
1482 var resInst tp_pb.ResourceInstance
1483 if err = proto.Unmarshal(value, &resInst); err != nil {
1484 logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"keyPath": keyPath, "value": value})
1485 continue
1486 } else {
1487 if eponTpInst := t.getEponTpInstanceFromResourceInstance(ctx, &resInst); eponTpInst != nil {
1488 // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
1489 keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
1490 if len(keySuffixSlice) == 2 {
1491 keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
1492 // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
1493 if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
1494 logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
1495 continue
1496 }
1497 } else {
1498 logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
1499 continue
1500 }
1501 t.epontpInstanceMapLock.Lock()
1502 t.eponTpInstanceMap[keySuffixSlice[1]] = eponTpInst
1503 t.epontpInstanceMapLock.Unlock()
1504 logger.Debugw(ctx, "reconciled-epon-tp-success", log.Fields{"keyPath": keyPath})
1505 }
1506 }
1507 } else {
1508 logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
1509 }
1510 }
1511 } else {
1512 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
1513 return fmt.Errorf("unknown-tech-%v", tech)
1514 }
1515
1516 return nil
1517}