blob: 6d68f5abb6e7396a9400273a932f8a2ee8e710d0 [file] [log] [blame]
khenaidoo106c61a2021-08-11 18:05:46 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package techprofile
18
19import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "regexp"
25 "strconv"
26 "sync"
27 "time"
28
29 "github.com/golang/protobuf/jsonpb"
30 "github.com/golang/protobuf/proto"
31 "github.com/opencord/voltha-lib-go/v7/pkg/db"
32 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
33 "github.com/opencord/voltha-lib-go/v7/pkg/log"
khenaidoo106c61a2021-08-11 18:05:46 -040034 tp_pb "github.com/opencord/voltha-protos/v5/go/tech_profile"
35)
36
37// Interface to pon resource manager APIs
38type iPonResourceMgr interface {
39 GetResourceID(ctx context.Context, intfID uint32, resourceType string, numIDs uint32) ([]uint32, error)
40 FreeResourceID(ctx context.Context, intfID uint32, resourceType string, ReleaseContent []uint32) error
41 GetResourceTypeAllocID() string
42 GetResourceTypeGemPortID() string
43 GetResourceTypeOnuID() string
44 GetTechnology() string
45}
46
47type SchedulingPolicy int32
48
49const (
50 SchedulingPolicy_WRR SchedulingPolicy = 0
51 SchedulingPolicy_StrictPriority SchedulingPolicy = 1
52 SchedulingPolicy_Hybrid SchedulingPolicy = 2
53)
54
55type AdditionalBW int32
56
57const (
58 AdditionalBW_AdditionalBW_None AdditionalBW = 0
59 AdditionalBW_AdditionalBW_NA AdditionalBW = 1
60 AdditionalBW_AdditionalBW_BestEffort AdditionalBW = 2
61 AdditionalBW_AdditionalBW_Auto AdditionalBW = 3
62)
63
64type DiscardPolicy int32
65
66const (
67 DiscardPolicy_TailDrop DiscardPolicy = 0
68 DiscardPolicy_WTailDrop DiscardPolicy = 1
69 DiscardPolicy_Red DiscardPolicy = 2
70 DiscardPolicy_WRed DiscardPolicy = 3
71)
72
73// Required uniPortName format
74var uniPortNameFormatRegexp = regexp.MustCompile(`^olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
75
76// instance control defaults
77const (
78 defaultOnuInstance = "multi-instance"
79 defaultUniInstance = "single-instance"
80 defaultGemPayloadSize = "auto"
81)
82
83// default discard config constants
84const (
85 defaultMinThreshold = 0
86 defaultMaxThreshold = 0
87 defaultMaxProbability = 0
88)
89
90// default scheduler contants
91const (
92 defaultPriority = 0
93 defaultWeight = 0
94)
95
96// default GEM attribute constants
97const (
98 defaultAESEncryption = "True"
99 defaultPriorityQueue = 0
100 defaultQueueWeight = 0
101 defaultMaxQueueSize = "auto"
102 defaultIsMulticast = "False"
103 defaultAccessControlList = "224.0.0.0-239.255.255.255"
104 defaultMcastGemID = 4069
105)
106
107// Default EPON constants
108const (
109 defaultPakageType = "B"
110)
111const (
112 defaultTrafficType = "BE"
113 defaultUnsolicitedGrantSize = 0
114 defaultNominalInterval = 0
115 defaultToleratedPollJitter = 0
116 defaultRequestTransmissionPolicy = 0
117 defaultNumQueueSet = 2
118)
119const (
120 defaultQThreshold1 = 5500
121 defaultQThreshold2 = 0
122 defaultQThreshold3 = 0
123 defaultQThreshold4 = 0
124 defaultQThreshold5 = 0
125 defaultQThreshold6 = 0
126 defaultQThreshold7 = 0
127)
128
129const (
130 xgspon = "XGS-PON"
131 xgpon = "XGPON"
132 gpon = "GPON"
133 epon = "EPON"
134)
135
136const (
137 MaxUniPortPerOnu = 16 // TODO: Adapter uses its own constant for MaxUniPort. How to synchronize this and have a single source of truth?
138)
139
140type TechProfileMgr struct {
141 config *TechProfileFlags
142 resourceMgr iPonResourceMgr
143 OnuIDMgmtLock sync.RWMutex
144 GemPortIDMgmtLock sync.RWMutex
145 AllocIDMgmtLock sync.RWMutex
146 tpInstanceMap map[string]*tp_pb.TechProfileInstance // Map of tp path to tp instance
147 tpInstanceMapLock sync.RWMutex
148 eponTpInstanceMap map[string]*tp_pb.EponTechProfileInstance // Map of tp path to epon tp instance
149 epontpInstanceMapLock sync.RWMutex
150 tpMap map[uint32]*tp_pb.TechProfile // Map of tp id to tp
151 tpMapLock sync.RWMutex
152 eponTpMap map[uint32]*tp_pb.EponTechProfile // map of tp id to epon tp
153 eponTpMapLock sync.RWMutex
154}
155
156func (t *TechProfileMgr) SetKVClient(ctx context.Context, pathPrefix string) *db.Backend {
157 kvClient, err := newKVClient(ctx, t.config.KVStoreType, t.config.KVStoreAddress, t.config.KVStoreTimeout)
158 if err != nil {
159 logger.Errorw(ctx, "failed-to-create-kv-client",
160 log.Fields{
161 "type": t.config.KVStoreType, "address": t.config.KVStoreAddress,
162 "timeout": t.config.KVStoreTimeout, "prefix": pathPrefix,
163 "error": err.Error(),
164 })
165 return nil
166 }
167 return &db.Backend{
168 Client: kvClient,
169 StoreType: t.config.KVStoreType,
170 Address: t.config.KVStoreAddress,
171 Timeout: t.config.KVStoreTimeout,
172 PathPrefix: pathPrefix}
173
174 /* TODO : Make sure direct call to NewBackend is working fine with backend , currently there is some
175 issue between kv store and backend , core is not calling NewBackend directly
176 kv := model.NewBackend(t.config.kvStoreType, t.config.KVStoreHost, t.config.KVStorePort,
177 t.config.KVStoreTimeout, kvStoreTechProfilePathPrefix)
178 */
179}
180
181func NewTechProfile(ctx context.Context, resourceMgr iPonResourceMgr, kvStoreType string, kvStoreAddress string, basePathKvStore string) (*TechProfileMgr, error) {
182 var techprofileObj TechProfileMgr
183 logger.Debug(ctx, "initializing-techprofile-mananger")
184 techprofileObj.config = NewTechProfileFlags(kvStoreType, kvStoreAddress, basePathKvStore)
185 techprofileObj.config.KVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.TPKVPathPrefix)
186 techprofileObj.config.DefaultTpKVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.defaultTpKvPathPrefix)
187 if techprofileObj.config.KVBackend == nil {
188 logger.Error(ctx, "failed-to-initialize-backend")
189 return nil, errors.New("kv-backend-init-failed")
190 }
191 techprofileObj.config.ResourceInstanceKVBacked = techprofileObj.SetKVClient(ctx, techprofileObj.config.ResourceInstanceKVPathPrefix)
192 if techprofileObj.config.ResourceInstanceKVBacked == nil {
193 logger.Error(ctx, "failed-to-initialize-resource-instance-kv-backend")
194 return nil, errors.New("resource-instance-kv-backend-init-failed")
195 }
196 techprofileObj.resourceMgr = resourceMgr
197 techprofileObj.tpInstanceMap = make(map[string]*tp_pb.TechProfileInstance)
198 techprofileObj.eponTpInstanceMap = make(map[string]*tp_pb.EponTechProfileInstance)
199 techprofileObj.tpMap = make(map[uint32]*tp_pb.TechProfile)
200 techprofileObj.eponTpMap = make(map[uint32]*tp_pb.EponTechProfile)
201 logger.Debug(ctx, "reconcile-tp-instance-cache-start")
202 if err := techprofileObj.reconcileTpInstancesToCache(ctx); err != nil {
203 logger.Errorw(ctx, "failed-to-reconcile-tp-instances", log.Fields{"err": err})
204 return nil, err
205 }
206 logger.Debug(ctx, "reconcile-tp-instance-cache-end")
207 logger.Debug(ctx, "initializing-tech-profile-manager-object-success")
208 return &techprofileObj, nil
209}
210
211// GetTechProfileInstanceKey returns the tp instance key that is used to reference TP Instance Map
212func (t *TechProfileMgr) GetTechProfileInstanceKey(ctx context.Context, tpID uint32, uniPortName string) string {
213 logger.Debugw(ctx, "get-tp-instance-kv-key", log.Fields{
214 "uniPortName": uniPortName,
215 "tpId": tpID,
216 })
217 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
218 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
219 logger.Warnw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
220 }
221 // The key path prefix (like service/voltha/technology_profiles or service/voltha_voltha/technology_profiles)
222 // is expected to be attached by the components that use this path as part of the KVBackend configuration.
223 resourceInstanceKvPathSuffix := "%s/%d/%s" // <technology>/<tpID>/<uni-port-name>
224 // <uni-port-name> must be of the format pon-{\d+}/onu-{\d+}/uni-{\d+}
225 return fmt.Sprintf(resourceInstanceKvPathSuffix, t.resourceMgr.GetTechnology(), tpID, uniPortName)
226}
227
228// GetTPInstance gets TP instance from cache if found
229func (t *TechProfileMgr) GetTPInstance(ctx context.Context, path string) (interface{}, error) {
230 tech := t.resourceMgr.GetTechnology()
231 switch tech {
232 case xgspon, xgpon, gpon:
233 t.tpInstanceMapLock.RLock()
234 defer t.tpInstanceMapLock.RUnlock()
235 tpInst, ok := t.tpInstanceMap[path]
236 if !ok {
237 return nil, fmt.Errorf("tp-instance-not-found-tp-path-%v", path)
238 }
239 return tpInst, nil
240 case epon:
241 t.epontpInstanceMapLock.RLock()
242 defer t.epontpInstanceMapLock.RUnlock()
243 tpInst, ok := t.eponTpInstanceMap[path]
244 if !ok {
245 return nil, fmt.Errorf("tp-instance-not-found-tp-path-%v", path)
246 }
247 return tpInst, nil
248 default:
249 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
250 return nil, fmt.Errorf("unknown-tech-%s-tp-path-%v", tech, path)
251 }
252}
253
254// CreateTechProfileInstance creates a new TP instance.
255func (t *TechProfileMgr) CreateTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string, intfID uint32) (interface{}, error) {
256 var tpInstance *tp_pb.TechProfileInstance
257 var eponTpInstance *tp_pb.EponTechProfileInstance
258
259 logger.Infow(ctx, "creating-tp-instance", log.Fields{"tpID": tpID, "uni": uniPortName, "intId": intfID})
260
261 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
262 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
263 logger.Errorw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
264 return nil, fmt.Errorf("uni-port-name-not-confirming-to-format-%s", uniPortName)
265 }
266 tpInstancePathSuffix := t.GetTechProfileInstanceKey(ctx, tpID, uniPortName)
267
268 if t.resourceMgr.GetTechnology() == epon {
269 tp := t.getEponTPFromKVStore(ctx, tpID)
270 if tp != nil {
271 if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
272 logger.Error(ctx, "invalid-instance-ctrl-attr-using-default-tp")
273 tp = t.getDefaultEponProfile(ctx)
274 } else {
275 logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpID": tpID})
276 }
277 } else {
278 logger.Info(ctx, "tp-not-found-on-kv--creating-default-tp")
279 tp = t.getDefaultEponProfile(ctx)
280 }
281 // Store TP in cache
282 t.eponTpMapLock.Lock()
283 t.eponTpMap[tpID] = tp
284 t.eponTpMapLock.Unlock()
285
286 if eponTpInstance = t.allocateEponTPInstance(ctx, uniPortName, tp, intfID, tpInstancePathSuffix); eponTpInstance == nil {
287 logger.Error(ctx, "tp-instance-allocation-failed")
288 return nil, errors.New("tp-instance-allocation-failed")
289 }
290 t.epontpInstanceMapLock.Lock()
291 t.eponTpInstanceMap[tpInstancePathSuffix] = eponTpInstance
292 t.epontpInstanceMapLock.Unlock()
293 resInst := tp_pb.ResourceInstance{
294 TpId: tpID,
295 ProfileType: eponTpInstance.ProfileType,
296 SubscriberIdentifier: eponTpInstance.SubscriberIdentifier,
297 AllocId: eponTpInstance.AllocId,
298 }
299 for _, usQAttr := range eponTpInstance.UpstreamQueueAttributeList {
300 resInst.GemportIds = append(resInst.GemportIds, usQAttr.GemportId)
301 }
302
303 logger.Infow(ctx, "epon-tp-instance-created-successfully",
304 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
305 if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
306 logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
307 allocIDs := make([]uint32, 0)
308 allocIDs = append(allocIDs, resInst.AllocId)
309 errList := make([]error, 0)
310 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), allocIDs))
311 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), resInst.GemportIds))
312 if len(errList) > 0 {
313 logger.Errorw(ctx, "failed-to-free-up-resources-on-kv-store--system-behavior-has-become-erratic", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "errList": errList})
314 }
315 return nil, err
316 }
317 return eponTpInstance, nil
318 } else {
319 tp := t.getTPFromKVStore(ctx, tpID)
320 if tp != nil {
321 if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
322 logger.Error(ctx, "invalid-instance-ctrl-attr--using-default-tp")
323 tp = t.getDefaultTechProfile(ctx)
324 } else {
325 logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpID": tpID})
326 }
327 } else {
328 logger.Info(ctx, "tp-not-found-on-kv--creating-default-tp")
329 tp = t.getDefaultTechProfile(ctx)
330 }
331 // Store TP in cache
332 t.tpMapLock.Lock()
333 t.tpMap[tpID] = tp
334 t.tpMapLock.Unlock()
335
336 if tpInstance = t.allocateTPInstance(ctx, uniPortName, tp, intfID, tpInstancePathSuffix); tpInstance == nil {
337 logger.Error(ctx, "tp-instance-allocation-failed")
338 return nil, errors.New("tp-instance-allocation-failed")
339 }
340 t.tpInstanceMapLock.Lock()
341 t.tpInstanceMap[tpInstancePathSuffix] = tpInstance
342 t.tpInstanceMapLock.Unlock()
343
344 resInst := tp_pb.ResourceInstance{
345 TpId: tpID,
346 ProfileType: tpInstance.ProfileType,
347 SubscriberIdentifier: tpInstance.SubscriberIdentifier,
348 AllocId: tpInstance.UsScheduler.AllocId,
349 }
350 for _, usQAttr := range tpInstance.UpstreamGemPortAttributeList {
351 resInst.GemportIds = append(resInst.GemportIds, usQAttr.GemportId)
352 }
353
354 logger.Infow(ctx, "tp-instance-created-successfully",
355 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
356 if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
357 logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
358 allocIDs := make([]uint32, 0)
359 allocIDs = append(allocIDs, resInst.AllocId)
360 errList := make([]error, 0)
361 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), allocIDs))
362 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), resInst.GemportIds))
363 if len(errList) > 0 {
364 logger.Fatalw(ctx, "failed-to-free-up-resources-on-kv-store--system-behavior-has-become-erratic", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
365 }
366 return nil, err
367 }
368
369 logger.Infow(ctx, "resource-instance-added-to-kv-store-successfully",
370 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
371 return tpInstance, nil
372 }
373}
374
375// DeleteTechProfileInstance deletes the TP instance from the local cache as well as deletes the corresponding
376// resource instance from the KV store.
377func (t *TechProfileMgr) DeleteTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string) error {
378 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
379 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
380 logger.Errorw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
381 return fmt.Errorf("uni-port-name-not-confirming-to-format--%s", uniPortName)
382 }
383 path := t.GetTechProfileInstanceKey(ctx, tpID, uniPortName)
384 logger.Infow(ctx, "delete-tp-instance-from-cache", log.Fields{"key": path})
385 t.tpInstanceMapLock.Lock()
386 delete(t.tpInstanceMap, path)
387 t.tpInstanceMapLock.Unlock()
388 if err := t.removeResourceInstanceFromKVStore(ctx, tpID, uniPortName); err != nil {
389 return err
390 }
391 return nil
392}
393
394func (t *TechProfileMgr) GetMulticastTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance) []*tp_pb.TrafficQueue {
395 var encryp bool
396 NumGemPorts := len(tp.DownstreamGemPortAttributeList)
397 mcastTrafficQueues := make([]*tp_pb.TrafficQueue, 0)
398 for Count := 0; Count < NumGemPorts; Count++ {
399 if !isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
400 continue
401 }
402 if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
403 encryp = true
404 } else {
405 encryp = false
406 }
407 mcastTrafficQueues = append(mcastTrafficQueues, &tp_pb.TrafficQueue{
408 Direction: tp_pb.Direction_DOWNSTREAM,
409 GemportId: tp.DownstreamGemPortAttributeList[Count].MulticastGemId,
410 PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
411 AesEncryption: encryp,
412 SchedPolicy: tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy,
413 Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQ,
414 Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
415 DiscardPolicy: tp.DownstreamGemPortAttributeList[Count].DiscardPolicy,
416 })
417 }
418 logger.Debugw(ctx, "Downstream Multicast Traffic queue list ", log.Fields{"queuelist": mcastTrafficQueues})
419 return mcastTrafficQueues
420}
421
422func (t *TechProfileMgr) GetGemportForPbit(ctx context.Context, tp interface{}, dir tp_pb.Direction, pbit uint32) interface{} {
423 /*
424 Function to get the Gemport mapped to a pbit.
425 */
426 switch tp := tp.(type) {
427 case *tp_pb.TechProfileInstance:
428 if dir == tp_pb.Direction_UPSTREAM {
429 // upstream GEM ports
430 numGemPorts := len(tp.UpstreamGemPortAttributeList)
431 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
432 lenOfPbitMap := len(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap)
433 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
434 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
435 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
436 if p, err := strconv.Atoi(string(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
437 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
438 logger.Debugw(ctx, "Found-US-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.UpstreamGemPortAttributeList[gemCnt].GemportId})
439 return tp.UpstreamGemPortAttributeList[gemCnt]
440 }
441 }
442 }
443 }
444 } else if dir == tp_pb.Direction_DOWNSTREAM {
445 //downstream GEM ports
446 numGemPorts := len(tp.DownstreamGemPortAttributeList)
447 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
448 lenOfPbitMap := len(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap)
449 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
450 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
451 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
452 if p, err := strconv.Atoi(string(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
453 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
454 logger.Debugw(ctx, "Found-DS-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.DownstreamGemPortAttributeList[gemCnt].GemportId})
455 return tp.DownstreamGemPortAttributeList[gemCnt]
456 }
457 }
458 }
459 }
460 }
461 logger.Errorw(ctx, "No-GemportId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
khenaidoodc2116e2021-10-19 17:33:19 -0400462 case *tp_pb.EponTechProfileInstance:
khenaidoo106c61a2021-08-11 18:05:46 -0400463 if dir == tp_pb.Direction_UPSTREAM {
464 // upstream GEM ports
465 numGemPorts := len(tp.UpstreamQueueAttributeList)
466 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
467 lenOfPbitMap := len(tp.UpstreamQueueAttributeList[gemCnt].PbitMap)
468 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
469 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
470 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
471 if p, err := strconv.Atoi(string(tp.UpstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
472 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
473 logger.Debugw(ctx, "Found-US-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.UpstreamQueueAttributeList[gemCnt].GemportId})
474 return tp.UpstreamQueueAttributeList[gemCnt]
475 }
476 }
477 }
478 }
479 } else if dir == tp_pb.Direction_DOWNSTREAM {
480 //downstream GEM ports
481 numGemPorts := len(tp.DownstreamQueueAttributeList)
482 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
483 lenOfPbitMap := len(tp.DownstreamQueueAttributeList[gemCnt].PbitMap)
484 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
485 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
486 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
487 if p, err := strconv.Atoi(string(tp.DownstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
488 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
489 logger.Debugw(ctx, "Found-DS-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.DownstreamQueueAttributeList[gemCnt].GemportId})
490 return tp.DownstreamQueueAttributeList[gemCnt]
491 }
492 }
493 }
494 }
495 }
496 logger.Errorw(ctx, "No-QueueId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
497 default:
498 logger.Errorw(ctx, "unknown-tech", log.Fields{"tp": tp})
499 }
500 return nil
501}
502
503// FindAllTpInstances returns all TechProfile instances for a given TechProfile table-id, pon interface ID and onu ID.
504func (t *TechProfileMgr) FindAllTpInstances(ctx context.Context, oltDeviceID string, tpID uint32, intfID uint32, onuID uint32) interface{} {
505 onuTpInstancePathSuffix := fmt.Sprintf("%s/%d/olt-{%s}/pon-{%d}/onu-{%d}", t.resourceMgr.GetTechnology(), tpID, oltDeviceID, intfID, onuID)
506 tech := t.resourceMgr.GetTechnology()
507 if tech == xgspon || tech == xgpon || tech == gpon {
508 t.tpInstanceMapLock.RLock()
509 defer t.tpInstanceMapLock.RUnlock()
510 tpInstancesTech := make([]tp_pb.TechProfileInstance, 0)
511 for i := 0; i < MaxUniPortPerOnu; i++ {
512 key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
513 if tpInst, ok := t.tpInstanceMap[key]; ok {
514 tpInstancesTech = append(tpInstancesTech, *tpInst)
515 }
516 }
517 return tpInstancesTech
518 } else if tech == epon {
519 t.epontpInstanceMapLock.RLock()
520 defer t.epontpInstanceMapLock.RUnlock()
521 tpInstancesTech := make([]tp_pb.EponTechProfileInstance, 0)
522 for i := 0; i < MaxUniPortPerOnu; i++ {
523 key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
524 if tpInst, ok := t.eponTpInstanceMap[key]; ok {
525 tpInstancesTech = append(tpInstancesTech, *tpInst)
526 }
527 }
528 return tpInstancesTech
529 } else {
530 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech, "tpID": tpID, "onuID": onuID, "intfID": intfID})
531 }
532 return nil
533}
534
535func (t *TechProfileMgr) GetResourceID(ctx context.Context, intfID uint32, resourceType string, numIDs uint32) ([]uint32, error) {
536 logger.Debugw(ctx, "getting-resource-id", log.Fields{
537 "intf-id": intfID,
538 "resource-type": resourceType,
539 "num": numIDs,
540 })
541 var err error
542 var ids []uint32
543 switch resourceType {
544 case t.resourceMgr.GetResourceTypeAllocID():
545 t.AllocIDMgmtLock.Lock()
546 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
547 t.AllocIDMgmtLock.Unlock()
548 case t.resourceMgr.GetResourceTypeGemPortID():
549 t.GemPortIDMgmtLock.Lock()
550 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
551 t.GemPortIDMgmtLock.Unlock()
552 case t.resourceMgr.GetResourceTypeOnuID():
553 t.OnuIDMgmtLock.Lock()
554 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
555 t.OnuIDMgmtLock.Unlock()
556 default:
557 return nil, fmt.Errorf("resourceType %s not supported", resourceType)
558 }
559 if err != nil {
560 return nil, err
561 }
562 return ids, nil
563}
564
565func (t *TechProfileMgr) FreeResourceID(ctx context.Context, intfID uint32, resourceType string, ReleaseContent []uint32) error {
566 logger.Debugw(ctx, "freeing-resource-id", log.Fields{
567 "intf-id": intfID,
568 "resource-type": resourceType,
569 "release-content": ReleaseContent,
570 })
571 var err error
572 switch resourceType {
573 case t.resourceMgr.GetResourceTypeAllocID():
574 t.AllocIDMgmtLock.Lock()
575 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
576 t.AllocIDMgmtLock.Unlock()
577 case t.resourceMgr.GetResourceTypeGemPortID():
578 t.GemPortIDMgmtLock.Lock()
579 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
580 t.GemPortIDMgmtLock.Unlock()
581 case t.resourceMgr.GetResourceTypeOnuID():
582 t.OnuIDMgmtLock.Lock()
583 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
584 t.OnuIDMgmtLock.Unlock()
585 default:
586 return fmt.Errorf("resourceType %s not supported", resourceType)
587 }
588 if err != nil {
589 return err
590 }
591 return nil
592}
593
594func (t *TechProfileMgr) GetUsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
595 return &tp_pb.SchedulerConfig{
596 Direction: tpInstance.UsScheduler.Direction,
597 AdditionalBw: tpInstance.UsScheduler.AdditionalBw,
598 Priority: tpInstance.UsScheduler.Priority,
599 Weight: tpInstance.UsScheduler.Weight,
600 SchedPolicy: tpInstance.UsScheduler.QSchedPolicy}
601}
602
603func (t *TechProfileMgr) GetDsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
604 return &tp_pb.SchedulerConfig{
605 Direction: tpInstance.DsScheduler.Direction,
606 AdditionalBw: tpInstance.DsScheduler.AdditionalBw,
607 Priority: tpInstance.DsScheduler.Priority,
608 Weight: tpInstance.DsScheduler.Weight,
609 SchedPolicy: tpInstance.DsScheduler.QSchedPolicy}
610}
611
612func (t *TechProfileMgr) GetTrafficScheduler(tpInstance *tp_pb.TechProfileInstance, SchedCfg *tp_pb.SchedulerConfig,
613 ShapingCfg *tp_pb.TrafficShapingInfo) *tp_pb.TrafficScheduler {
614
615 tSched := &tp_pb.TrafficScheduler{
616 Direction: SchedCfg.Direction,
617 AllocId: tpInstance.UsScheduler.AllocId,
618 TrafficShapingInfo: ShapingCfg,
619 Scheduler: SchedCfg}
620
621 return tSched
622}
623
624func (t *TechProfileMgr) GetTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance, direction tp_pb.Direction) ([]*tp_pb.TrafficQueue, error) {
625
626 var encryp bool
627 if direction == tp_pb.Direction_UPSTREAM {
628 // upstream GEM ports
629 NumGemPorts := len(tp.UpstreamGemPortAttributeList)
630 GemPorts := make([]*tp_pb.TrafficQueue, 0)
631 for Count := 0; Count < NumGemPorts; Count++ {
632 if tp.UpstreamGemPortAttributeList[Count].AesEncryption == "True" {
633 encryp = true
634 } else {
635 encryp = false
636 }
637
638 GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
639 Direction: direction,
640 GemportId: tp.UpstreamGemPortAttributeList[Count].GemportId,
641 PbitMap: tp.UpstreamGemPortAttributeList[Count].PbitMap,
642 AesEncryption: encryp,
643 SchedPolicy: tp.UpstreamGemPortAttributeList[Count].SchedulingPolicy,
644 Priority: tp.UpstreamGemPortAttributeList[Count].PriorityQ,
645 Weight: tp.UpstreamGemPortAttributeList[Count].Weight,
646 DiscardPolicy: tp.UpstreamGemPortAttributeList[Count].DiscardPolicy,
647 })
648 }
649 logger.Debugw(ctx, "Upstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
650 return GemPorts, nil
651 } else if direction == tp_pb.Direction_DOWNSTREAM {
652 //downstream GEM ports
653 NumGemPorts := len(tp.DownstreamGemPortAttributeList)
654 GemPorts := make([]*tp_pb.TrafficQueue, 0)
655 for Count := 0; Count < NumGemPorts; Count++ {
656 if isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
657 //do not take multicast GEM ports. They are handled separately.
658 continue
659 }
660 if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
661 encryp = true
662 } else {
663 encryp = false
664 }
665
666 GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
667 Direction: direction,
668 GemportId: tp.DownstreamGemPortAttributeList[Count].GemportId,
669 PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
670 AesEncryption: encryp,
671 SchedPolicy: tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy,
672 Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQ,
673 Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
674 DiscardPolicy: tp.DownstreamGemPortAttributeList[Count].DiscardPolicy,
675 })
676 }
677 logger.Debugw(ctx, "Downstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
678 return GemPorts, nil
679 }
680
681 logger.Errorf(ctx, "Unsupported direction %s used for generating Traffic Queue list", direction)
682 return nil, fmt.Errorf("downstream gem port traffic queue creation failed due to unsupported direction %s", direction)
683}
684
685func (t *TechProfileMgr) validateInstanceControlAttr(ctx context.Context, instCtl tp_pb.InstanceControl) error {
686 if instCtl.Onu != "single-instance" && instCtl.Onu != "multi-instance" {
687 logger.Errorw(ctx, "invalid-onu-instance-control-attribute", log.Fields{"onu-inst": instCtl.Onu})
688 return errors.New("invalid-onu-instance-ctl-attr")
689 }
690
691 if instCtl.Uni != "single-instance" && instCtl.Uni != "multi-instance" {
692 logger.Errorw(ctx, "invalid-uni-instance-control-attribute", log.Fields{"uni-inst": instCtl.Uni})
693 return errors.New("invalid-uni-instance-ctl-attr")
694 }
695
696 if instCtl.Uni == "multi-instance" {
697 logger.Error(ctx, "uni-multi-instance-tp-not-supported")
698 return errors.New("uni-multi-instance-tp-not-supported")
699 }
700
701 return nil
702}
703
704// allocateTPInstance for GPON, XGPON and XGS-PON technology
705func (t *TechProfileMgr) allocateTPInstance(ctx context.Context, uniPortName string, tp *tp_pb.TechProfile, intfID uint32, tpInstPathSuffix string) *tp_pb.TechProfileInstance {
706
707 var usGemPortAttributeList []*tp_pb.GemPortAttributes
708 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
709 var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
710 var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
711 var tcontIDs []uint32
712 var gemPorts []uint32
713 var err error
714
715 logger.Infow(ctx, "Allocating TechProfileMgr instance from techprofile template", log.Fields{"uniPortName": uniPortName, "intfID": intfID, "numGem": tp.NumGemPorts})
716
717 if tp.InstanceControl.Onu == "multi-instance" {
718 tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1)
719 if err != nil {
720 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
721 return nil
722 }
723 } else { // "single-instance"
724 if tpInst := t.getSingleInstanceTp(ctx, tpInstPathSuffix); tpInst == nil {
725 // No "single-instance" tp found on one any uni port for the given TP ID
726 // Allocate a new TcontID or AllocID
727 tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1)
728 if err != nil {
729 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
730 return nil
731 }
732 } else {
733 // Use the alloc-id from the existing TpInstance
734 tcontIDs = append(tcontIDs, tpInst.UsScheduler.AllocId)
735 }
736 }
737 logger.Debugw(ctx, "Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
738 gemPorts, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts)
739 if err != nil {
740 logger.Errorw(ctx, "Error getting gemport ids from rsrcrMgr", log.Fields{"err": err, "intfID": intfID, "numGemports": tp.NumGemPorts})
741 return nil
742 }
743 logger.Infow(ctx, "Allocated tconts and GEM ports successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
744 for index := 0; index < int(tp.NumGemPorts); index++ {
745 usGemPortAttributeList = append(usGemPortAttributeList,
746 &tp_pb.GemPortAttributes{GemportId: gemPorts[index],
747 MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
748 PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
749 AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
750 SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
751 PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
752 Weight: tp.UpstreamGemPortAttributeList[index].Weight,
753 DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
754 DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
755 }
756
757 logger.Info(ctx, "length of DownstreamGemPortAttributeList", len(tp.DownstreamGemPortAttributeList))
758 //put multicast and unicast downstream GEM port attributes in different lists first
759 for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
760 if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
761 dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
762 &tp_pb.GemPortAttributes{
763 MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
764 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
765 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
766 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
767 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
768 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
769 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
770 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
771 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
772 IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
773 DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
774 StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
775 } else {
776 dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
777 &tp_pb.GemPortAttributes{
778 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
779 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
780 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
781 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
782 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
783 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
784 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
785 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
786 }
787 }
788 //add unicast downstream GEM ports to dsGemPortAttributeList
789 if dsUnicastGemAttributeList != nil {
790 for index := 0; index < int(tp.NumGemPorts); index++ {
791 dsGemPortAttributeList = append(dsGemPortAttributeList,
792 &tp_pb.GemPortAttributes{GemportId: gemPorts[index],
793 MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
794 PbitMap: dsUnicastGemAttributeList[index].PbitMap,
795 AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
796 SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
797 PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
798 Weight: dsUnicastGemAttributeList[index].Weight,
799 DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
800 DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
801 }
802 }
803 //add multicast GEM ports to dsGemPortAttributeList afterwards
804 for k := range dsMulticastGemAttributeList {
805 dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
806 }
807
808 return &tp_pb.TechProfileInstance{
809 SubscriberIdentifier: uniPortName,
810 Name: tp.Name,
811 ProfileType: tp.ProfileType,
812 Version: tp.Version,
813 NumGemPorts: tp.NumGemPorts,
814 InstanceControl: tp.InstanceControl,
815 UsScheduler: &tp_pb.SchedulerAttributes{
816 AllocId: tcontIDs[0],
817 Direction: tp.UsScheduler.Direction,
818 AdditionalBw: tp.UsScheduler.AdditionalBw,
819 Priority: tp.UsScheduler.Priority,
820 Weight: tp.UsScheduler.Weight,
821 QSchedPolicy: tp.UsScheduler.QSchedPolicy},
822 DsScheduler: &tp_pb.SchedulerAttributes{
823 AllocId: tcontIDs[0],
824 Direction: tp.DsScheduler.Direction,
825 AdditionalBw: tp.DsScheduler.AdditionalBw,
826 Priority: tp.DsScheduler.Priority,
827 Weight: tp.DsScheduler.Weight,
828 QSchedPolicy: tp.DsScheduler.QSchedPolicy},
829 UpstreamGemPortAttributeList: usGemPortAttributeList,
830 DownstreamGemPortAttributeList: dsGemPortAttributeList}
831}
832
833// allocateTPInstance function for EPON
834func (t *TechProfileMgr) allocateEponTPInstance(ctx context.Context, uniPortName string, tp *tp_pb.EponTechProfile, intfID uint32, tpInstPath string) *tp_pb.EponTechProfileInstance {
835
836 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
837 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
838 var tcontIDs []uint32
839 var gemPorts []uint32
840 var err error
841
842 logger.Infow(ctx, "allocating-tp-instance-from-tp-template", log.Fields{"uniPortName": uniPortName, "intfID": intfID, "numGem": tp.NumGemPorts})
843
844 if tp.InstanceControl.Onu == "multi-instance" {
845 if tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
846 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
847 return nil
848 }
849 } else { // "single-instance"
850 if tpInst := t.getSingleInstanceEponTp(ctx, tpInstPath); tpInst == nil {
851 // No "single-instance" tp found on one any uni port for the given TP ID
852 // Allocate a new TcontID or AllocID
853 if tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
854 logger.Errorw(ctx, "error-getting-alloc-id-from-resource-mgr", log.Fields{"err": err, "intfID": intfID})
855 return nil
856 }
857 } else {
858 // Use the alloc-id from the existing TpInstance
859 tcontIDs = append(tcontIDs, tpInst.AllocId)
860 }
861 }
862 logger.Debugw(ctx, "Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
863 if gemPorts, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts); err != nil {
864 logger.Errorw(ctx, "error-getting-gemport-id-from-resource-mgr", log.Fields{"err": err, "intfID": intfID, "numGemports": tp.NumGemPorts})
865 return nil
866 }
867 logger.Infow(ctx, "allocated-alloc-id-and-gemport-successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
868 for index := 0; index < int(tp.NumGemPorts); index++ {
869 usQueueAttributeList = append(usQueueAttributeList,
870 &tp_pb.EPONQueueAttributes{GemportId: gemPorts[index],
871 MaxQSize: tp.UpstreamQueueAttributeList[index].MaxQSize,
872 PbitMap: tp.UpstreamQueueAttributeList[index].PbitMap,
873 AesEncryption: tp.UpstreamQueueAttributeList[index].AesEncryption,
874 TrafficType: tp.UpstreamQueueAttributeList[index].TrafficType,
875 UnsolicitedGrantSize: tp.UpstreamQueueAttributeList[index].UnsolicitedGrantSize,
876 NominalInterval: tp.UpstreamQueueAttributeList[index].NominalInterval,
877 ToleratedPollJitter: tp.UpstreamQueueAttributeList[index].ToleratedPollJitter,
878 RequestTransmissionPolicy: tp.UpstreamQueueAttributeList[index].RequestTransmissionPolicy,
879 NumQSets: tp.UpstreamQueueAttributeList[index].NumQSets,
880 QThresholds: tp.UpstreamQueueAttributeList[index].QThresholds,
881 SchedulingPolicy: tp.UpstreamQueueAttributeList[index].SchedulingPolicy,
882 PriorityQ: tp.UpstreamQueueAttributeList[index].PriorityQ,
883 Weight: tp.UpstreamQueueAttributeList[index].Weight,
884 DiscardPolicy: tp.UpstreamQueueAttributeList[index].DiscardPolicy,
885 DiscardConfig: tp.UpstreamQueueAttributeList[index].DiscardConfig})
886 }
887
888 logger.Info(ctx, "length-of-downstream-gemport-attribute-list", len(tp.DownstreamQueueAttributeList))
889 for index := 0; index < int(tp.NumGemPorts); index++ {
890 dsQueueAttributeList = append(dsQueueAttributeList,
891 &tp_pb.EPONQueueAttributes{GemportId: gemPorts[index],
892 MaxQSize: tp.DownstreamQueueAttributeList[index].MaxQSize,
893 PbitMap: tp.DownstreamQueueAttributeList[index].PbitMap,
894 AesEncryption: tp.DownstreamQueueAttributeList[index].AesEncryption,
895 SchedulingPolicy: tp.DownstreamQueueAttributeList[index].SchedulingPolicy,
896 PriorityQ: tp.DownstreamQueueAttributeList[index].PriorityQ,
897 Weight: tp.DownstreamQueueAttributeList[index].Weight,
898 DiscardPolicy: tp.DownstreamQueueAttributeList[index].DiscardPolicy,
899 DiscardConfig: tp.DownstreamQueueAttributeList[index].DiscardConfig})
900 }
901
902 return &tp_pb.EponTechProfileInstance{
903 SubscriberIdentifier: uniPortName,
904 Name: tp.Name,
905 ProfileType: tp.ProfileType,
906 Version: tp.Version,
907 NumGemPorts: tp.NumGemPorts,
908 InstanceControl: tp.InstanceControl,
909 PackageType: tp.PackageType,
910 AllocId: tcontIDs[0],
911 UpstreamQueueAttributeList: usQueueAttributeList,
912 DownstreamQueueAttributeList: dsQueueAttributeList}
913}
914
915// getSingleInstanceTp returns another TpInstance (GPON, XGPON, XGS-PON) for an ONU on a different
916// uni port for the same TP ID, if it finds one, else nil.
917func (t *TechProfileMgr) getSingleInstanceTp(ctx context.Context, tpPathSuffix string) *tp_pb.TechProfileInstance {
918
919 // For example:
920 // tpPathSuffix like "XGS-PON/64/olt-{1234}/pon-{0}/onu-{1}/uni-{1}"
921 // is broken into ["XGS-PON/64/olt-{1234}/pon-{0}/onu-{1}" ""]
922 uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPathSuffix, 2)
923
924 t.tpInstanceMapLock.RLock()
925 defer t.tpInstanceMapLock.RUnlock()
926 for i := 0; i < MaxUniPortPerOnu; i++ {
927 key := fmt.Sprintf(uniPathSlice[0]+"/uni-{%d}", i)
928 if tpInst, ok := t.tpInstanceMap[key]; ok {
929 logger.Debugw(ctx, "found-single-instance-tp", log.Fields{"key": key})
930 return tpInst
931 }
932 }
933 return nil
934}
935
936// getSingleInstanceTp returns another TpInstance (EPON) for an ONU on a different
937// uni port for the same TP ID, if it finds one, else nil.
938func (t *TechProfileMgr) getSingleInstanceEponTp(ctx context.Context, tpPathSuffix string) *tp_pb.EponTechProfileInstance {
939 // For example:
940 // tpPathSuffix like "EPON/64/olt-{1234}/pon-{0}/onu-{1}/uni-{1}"
941 // is broken into ["EPON/64/-{1234}/pon-{0}/onu-{1}" ""]
942 uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPathSuffix, 2)
943
944 t.epontpInstanceMapLock.RLock()
945 defer t.epontpInstanceMapLock.RUnlock()
946 for i := 0; i < MaxUniPortPerOnu; i++ {
947 key := fmt.Sprintf(uniPathSlice[0]+"/uni-{%d}", i)
948 if tpInst, ok := t.eponTpInstanceMap[key]; ok {
949 logger.Debugw(ctx, "found-single-instance-tp", log.Fields{"key": key})
950 return tpInst
951 }
952 }
953 return nil
954}
955
956// getDefaultTechProfile returns a default TechProfile for GPON, XGPON, XGS-PON
957func (t *TechProfileMgr) getDefaultTechProfile(ctx context.Context) *tp_pb.TechProfile {
958 var usGemPortAttributeList []*tp_pb.GemPortAttributes
959 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
960
961 for _, pbit := range t.config.DefaultPbits {
962 logger.Debugw(ctx, "creating-gem-port-profile-profile", log.Fields{"pbit": pbit})
963 usGemPortAttributeList = append(usGemPortAttributeList,
964 &tp_pb.GemPortAttributes{
965 MaxQSize: defaultMaxQueueSize,
966 PbitMap: pbit,
967 AesEncryption: defaultAESEncryption,
968 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
969 PriorityQ: defaultPriorityQueue,
970 Weight: defaultQueueWeight,
971 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
972 DiscardConfigV2: &tp_pb.DiscardConfig{
973 DiscardPolicy: tp_pb.DiscardPolicy_Red,
974 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
975 RedDiscardConfig: &tp_pb.RedDiscardConfig{
976 MinThreshold: defaultMinThreshold,
977 MaxThreshold: defaultMaxThreshold,
978 MaxProbability: defaultMaxProbability,
979 },
980 },
981 },
982 DiscardConfig: &tp_pb.RedDiscardConfig{
983 MinThreshold: defaultMinThreshold,
984 MaxThreshold: defaultMaxThreshold,
985 MaxProbability: defaultMaxProbability,
986 },
987 })
988 dsGemPortAttributeList = append(dsGemPortAttributeList,
989 &tp_pb.GemPortAttributes{
990 MaxQSize: defaultMaxQueueSize,
991 PbitMap: pbit,
992 AesEncryption: defaultAESEncryption,
993 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
994 PriorityQ: defaultPriorityQueue,
995 Weight: defaultQueueWeight,
996 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
997 DiscardConfigV2: &tp_pb.DiscardConfig{
998 DiscardPolicy: tp_pb.DiscardPolicy_Red,
999 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1000 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1001 MinThreshold: defaultMinThreshold,
1002 MaxThreshold: defaultMaxThreshold,
1003 MaxProbability: defaultMaxProbability,
1004 },
1005 },
1006 },
1007 DiscardConfig: &tp_pb.RedDiscardConfig{
1008 MinThreshold: defaultMinThreshold,
1009 MaxThreshold: defaultMaxThreshold,
1010 MaxProbability: defaultMaxProbability,
1011 },
1012 IsMulticast: defaultIsMulticast,
1013 DynamicAccessControlList: defaultAccessControlList,
1014 StaticAccessControlList: defaultAccessControlList,
1015 MulticastGemId: defaultMcastGemID})
1016 }
1017 return &tp_pb.TechProfile{
1018 Name: t.config.DefaultTPName,
1019 ProfileType: t.resourceMgr.GetTechnology(),
1020 Version: t.config.TPVersion,
1021 NumGemPorts: uint32(len(usGemPortAttributeList)),
1022 InstanceControl: &tp_pb.InstanceControl{
1023 Onu: defaultOnuInstance,
1024 Uni: defaultUniInstance,
1025 MaxGemPayloadSize: defaultGemPayloadSize},
1026 UsScheduler: &tp_pb.SchedulerAttributes{
1027 Direction: tp_pb.Direction_UPSTREAM,
1028 AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
1029 Priority: defaultPriority,
1030 Weight: defaultWeight,
1031 QSchedPolicy: tp_pb.SchedulingPolicy_Hybrid},
1032 DsScheduler: &tp_pb.SchedulerAttributes{
1033 Direction: tp_pb.Direction_DOWNSTREAM,
1034 AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
1035 Priority: defaultPriority,
1036 Weight: defaultWeight,
1037 QSchedPolicy: tp_pb.SchedulingPolicy_Hybrid},
1038 UpstreamGemPortAttributeList: usGemPortAttributeList,
1039 DownstreamGemPortAttributeList: dsGemPortAttributeList}
1040}
1041
1042// getDefaultEponProfile returns a default TechProfile for EPON
1043func (t *TechProfileMgr) getDefaultEponProfile(ctx context.Context) *tp_pb.EponTechProfile {
1044
1045 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
1046 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
1047
1048 for _, pbit := range t.config.DefaultPbits {
1049 logger.Debugw(ctx, "Creating Queue", log.Fields{"pbit": pbit})
1050 usQueueAttributeList = append(usQueueAttributeList,
1051 &tp_pb.EPONQueueAttributes{
1052 MaxQSize: defaultMaxQueueSize,
1053 PbitMap: pbit,
1054 AesEncryption: defaultAESEncryption,
1055 TrafficType: defaultTrafficType,
1056 UnsolicitedGrantSize: defaultUnsolicitedGrantSize,
1057 NominalInterval: defaultNominalInterval,
1058 ToleratedPollJitter: defaultToleratedPollJitter,
1059 RequestTransmissionPolicy: defaultRequestTransmissionPolicy,
1060 NumQSets: defaultNumQueueSet,
1061 QThresholds: &tp_pb.QThresholds{
1062 QThreshold1: defaultQThreshold1,
1063 QThreshold2: defaultQThreshold2,
1064 QThreshold3: defaultQThreshold3,
1065 QThreshold4: defaultQThreshold4,
1066 QThreshold5: defaultQThreshold5,
1067 QThreshold6: defaultQThreshold6,
1068 QThreshold7: defaultQThreshold7},
1069 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
1070 PriorityQ: defaultPriorityQueue,
1071 Weight: defaultQueueWeight,
1072 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
1073 DiscardConfigV2: &tp_pb.DiscardConfig{
1074 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1075 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1076 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1077 MinThreshold: defaultMinThreshold,
1078 MaxThreshold: defaultMaxThreshold,
1079 MaxProbability: defaultMaxProbability,
1080 },
1081 },
1082 },
1083 DiscardConfig: &tp_pb.RedDiscardConfig{
1084 MinThreshold: defaultMinThreshold,
1085 MaxThreshold: defaultMaxThreshold,
1086 MaxProbability: defaultMaxProbability,
1087 }})
1088 dsQueueAttributeList = append(dsQueueAttributeList,
1089 &tp_pb.EPONQueueAttributes{
1090 MaxQSize: defaultMaxQueueSize,
1091 PbitMap: pbit,
1092 AesEncryption: defaultAESEncryption,
1093 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
1094 PriorityQ: defaultPriorityQueue,
1095 Weight: defaultQueueWeight,
1096 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
1097 DiscardConfigV2: &tp_pb.DiscardConfig{
1098 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1099 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1100 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1101 MinThreshold: defaultMinThreshold,
1102 MaxThreshold: defaultMaxThreshold,
1103 MaxProbability: defaultMaxProbability,
1104 },
1105 },
1106 },
1107 DiscardConfig: &tp_pb.RedDiscardConfig{
1108 MinThreshold: defaultMinThreshold,
1109 MaxThreshold: defaultMaxThreshold,
1110 MaxProbability: defaultMaxProbability,
1111 }})
1112 }
1113 return &tp_pb.EponTechProfile{
1114 Name: t.config.DefaultTPName,
1115 ProfileType: t.resourceMgr.GetTechnology(),
1116 Version: t.config.TPVersion,
1117 NumGemPorts: uint32(len(usQueueAttributeList)),
1118 InstanceControl: &tp_pb.InstanceControl{
1119 Onu: defaultOnuInstance,
1120 Uni: defaultUniInstance,
1121 MaxGemPayloadSize: defaultGemPayloadSize},
1122 PackageType: defaultPakageType,
1123 UpstreamQueueAttributeList: usQueueAttributeList,
1124 DownstreamQueueAttributeList: dsQueueAttributeList}
1125}
1126
1127//isMulticastGem returns true if isMulticast attribute value of a GEM port is true; false otherwise
1128func isMulticastGem(isMulticastAttrValue string) bool {
1129 return isMulticastAttrValue != "" &&
1130 (isMulticastAttrValue == "True" || isMulticastAttrValue == "true" || isMulticastAttrValue == "TRUE")
1131}
1132
1133func (t *TechProfileMgr) addResourceInstanceToKVStore(ctx context.Context, tpID uint32, uniPortName string, resInst tp_pb.ResourceInstance) error {
1134 logger.Debugw(ctx, "adding-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
1135 val, err := proto.Marshal(&resInst)
1136 if err != nil {
1137 logger.Errorw(ctx, "failed-to-marshall-resource-instance", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
1138 return err
1139 }
1140 err = t.config.ResourceInstanceKVBacked.Put(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName), val)
1141 return err
1142}
1143
1144func (t *TechProfileMgr) removeResourceInstanceFromKVStore(ctx context.Context, tpID uint32, uniPortName string) error {
1145 logger.Debugw(ctx, "removing-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName})
1146 if err := t.config.ResourceInstanceKVBacked.Delete(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName)); err != nil {
1147 logger.Errorw(ctx, "error-removing-resource-instance-to-kv-store", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
1148 return err
1149 }
1150 return nil
1151}
1152
1153func (t *TechProfileMgr) getTPFromKVStore(ctx context.Context, tpID uint32) *tp_pb.TechProfile {
1154 var tp *tp_pb.TechProfile
1155 t.tpMapLock.RLock()
1156 tp, ok := t.tpMap[tpID]
1157 t.tpMapLock.RUnlock()
1158 if ok {
1159 logger.Debugw(ctx, "found-tp-in-cache", log.Fields{"tpID": tpID})
1160 return tp
1161 }
1162 key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), tpID)
1163 logger.Debugw(ctx, "getting-tp-from-kv-store", log.Fields{"tpID": tpID, "Key": key})
1164 kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
1165 if err != nil {
1166 logger.Errorw(ctx, "error-fetching-from-kv-store", log.Fields{"err": err, "key": key})
1167 return nil
1168 }
1169 if kvresult != nil {
1170 /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
1171 if value, err := kvstore.ToByte(kvresult.Value); err == nil {
1172 lTp := &tp_pb.TechProfile{}
1173 reader := bytes.NewReader(value)
1174 if err = jsonpb.Unmarshal(reader, lTp); err != nil {
1175 logger.Errorw(ctx, "error-unmarshalling-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
1176 return nil
1177 }
1178
1179 logger.Debugw(ctx, "success-fetched-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lTp})
1180 return lTp
1181 } else {
1182 logger.Errorw(ctx, "error-decoding-tp", log.Fields{"err": err, "tpID": tpID})
1183 // We we create a default profile in this case.
1184 }
1185 }
1186
1187 return nil
1188}
1189
1190func (t *TechProfileMgr) getEponTPFromKVStore(ctx context.Context, tpID uint32) *tp_pb.EponTechProfile {
1191 var eponTp *tp_pb.EponTechProfile
1192 t.eponTpMapLock.RLock()
1193 eponTp, ok := t.eponTpMap[tpID]
1194 t.eponTpMapLock.RUnlock()
1195 if ok {
1196 logger.Debugw(ctx, "found-tp-in-cache", log.Fields{"tpID": tpID})
1197 return eponTp
1198 }
1199 key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), tpID)
1200 logger.Debugw(ctx, "getting-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "Key": key})
1201 kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
1202 if err != nil {
1203 logger.Errorw(ctx, "error-fetching-from-kv-store", log.Fields{"err": err, "key": key})
1204 return nil
1205 }
1206 if kvresult != nil {
1207 /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
1208 if value, err := kvstore.ToByte(kvresult.Value); err == nil {
1209 lEponTp := &tp_pb.EponTechProfile{}
1210 reader := bytes.NewReader(value)
1211 if err = jsonpb.Unmarshal(reader, lEponTp); err != nil {
1212 logger.Errorw(ctx, "error-unmarshalling-epon-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
1213 return nil
1214 }
1215
1216 logger.Debugw(ctx, "success-fetching-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lEponTp})
1217 return lEponTp
1218 }
1219 }
1220 return nil
1221}
1222
1223func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
1224
1225 logger.Infow(ctx, "kv-store", log.Fields{"storeType": storeType, "address": address})
1226 switch storeType {
1227 case "etcd":
1228 return kvstore.NewEtcdClient(ctx, address, timeout, log.WarnLevel)
Joey Armstronga6af1522023-01-17 16:06:16 -05001229 case "redis":
1230 return kvstore.NewRedisClient(address, timeout, false)
1231 case "redis-sentinel":
1232 return kvstore.NewRedisClient(address, timeout, true)
khenaidoo106c61a2021-08-11 18:05:46 -04001233 }
Joey Armstronga6af1522023-01-17 16:06:16 -05001234
khenaidoo106c61a2021-08-11 18:05:46 -04001235 return nil, errors.New("unsupported-kv-store")
1236}
1237
1238// buildTpInstanceFromResourceInstance for GPON, XGPON and XGS-PON technology - build TpInstance from TechProfile template and ResourceInstance
1239func (t *TechProfileMgr) buildTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.TechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
1240
1241 var usGemPortAttributeList []*tp_pb.GemPortAttributes
1242 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
1243 var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
1244 var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
1245
1246 if len(resInst.GemportIds) != int(tp.NumGemPorts) {
1247 logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-template-and-resource-instance",
1248 log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
1249 return nil
1250 }
1251 for index := 0; index < int(tp.NumGemPorts); index++ {
1252 usGemPortAttributeList = append(usGemPortAttributeList,
1253 &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
1254 MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
1255 PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
1256 AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
1257 SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
1258 PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
1259 Weight: tp.UpstreamGemPortAttributeList[index].Weight,
1260 DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
1261 DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
1262 }
1263
1264 //put multicast and unicast downstream GEM port attributes in different lists first
1265 for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
1266 if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
1267 dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
1268 &tp_pb.GemPortAttributes{
1269 MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
1270 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
1271 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
1272 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
1273 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
1274 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
1275 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
1276 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
1277 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
1278 IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
1279 DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
1280 StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
1281 } else {
1282 dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
1283 &tp_pb.GemPortAttributes{
1284 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
1285 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
1286 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
1287 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
1288 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
1289 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
1290 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
1291 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
1292 }
1293 }
1294 //add unicast downstream GEM ports to dsGemPortAttributeList
1295 if dsUnicastGemAttributeList != nil {
1296 for index := 0; index < int(tp.NumGemPorts); index++ {
1297 dsGemPortAttributeList = append(dsGemPortAttributeList,
1298 &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
1299 MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
1300 PbitMap: dsUnicastGemAttributeList[index].PbitMap,
1301 AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
1302 SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
1303 PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
1304 Weight: dsUnicastGemAttributeList[index].Weight,
1305 DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
1306 DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
1307 }
1308 }
1309 //add multicast GEM ports to dsGemPortAttributeList afterwards
1310 for k := range dsMulticastGemAttributeList {
1311 dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
1312 }
1313
1314 return &tp_pb.TechProfileInstance{
1315 SubscriberIdentifier: resInst.SubscriberIdentifier,
1316 Name: tp.Name,
1317 ProfileType: tp.ProfileType,
1318 Version: tp.Version,
1319 NumGemPorts: tp.NumGemPorts,
1320 InstanceControl: tp.InstanceControl,
1321 UsScheduler: &tp_pb.SchedulerAttributes{
1322 AllocId: resInst.AllocId,
1323 Direction: tp.UsScheduler.Direction,
1324 AdditionalBw: tp.UsScheduler.AdditionalBw,
1325 Priority: tp.UsScheduler.Priority,
1326 Weight: tp.UsScheduler.Weight,
1327 QSchedPolicy: tp.UsScheduler.QSchedPolicy},
1328 DsScheduler: &tp_pb.SchedulerAttributes{
1329 AllocId: resInst.AllocId,
1330 Direction: tp.DsScheduler.Direction,
1331 AdditionalBw: tp.DsScheduler.AdditionalBw,
1332 Priority: tp.DsScheduler.Priority,
1333 Weight: tp.DsScheduler.Weight,
1334 QSchedPolicy: tp.DsScheduler.QSchedPolicy},
1335 UpstreamGemPortAttributeList: usGemPortAttributeList,
1336 DownstreamGemPortAttributeList: dsGemPortAttributeList}
1337}
1338
1339// buildEponTpInstanceFromResourceInstance for EPON technology - build EponTpInstance from EponTechProfile template and ResourceInstance
1340func (t *TechProfileMgr) buildEponTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.EponTechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.EponTechProfileInstance {
1341
1342 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
1343 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
1344
1345 if len(resInst.GemportIds) != int(tp.NumGemPorts) {
1346 logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-epon-tp-template-and-resource-instance",
1347 log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
1348 return nil
1349 }
1350
1351 for index := 0; index < int(tp.NumGemPorts); index++ {
1352 usQueueAttributeList = append(usQueueAttributeList,
1353 &tp_pb.EPONQueueAttributes{GemportId: resInst.GemportIds[index],
1354 MaxQSize: tp.UpstreamQueueAttributeList[index].MaxQSize,
1355 PbitMap: tp.UpstreamQueueAttributeList[index].PbitMap,
1356 AesEncryption: tp.UpstreamQueueAttributeList[index].AesEncryption,
1357 TrafficType: tp.UpstreamQueueAttributeList[index].TrafficType,
1358 UnsolicitedGrantSize: tp.UpstreamQueueAttributeList[index].UnsolicitedGrantSize,
1359 NominalInterval: tp.UpstreamQueueAttributeList[index].NominalInterval,
1360 ToleratedPollJitter: tp.UpstreamQueueAttributeList[index].ToleratedPollJitter,
1361 RequestTransmissionPolicy: tp.UpstreamQueueAttributeList[index].RequestTransmissionPolicy,
1362 NumQSets: tp.UpstreamQueueAttributeList[index].NumQSets,
1363 QThresholds: tp.UpstreamQueueAttributeList[index].QThresholds,
1364 SchedulingPolicy: tp.UpstreamQueueAttributeList[index].SchedulingPolicy,
1365 PriorityQ: tp.UpstreamQueueAttributeList[index].PriorityQ,
1366 Weight: tp.UpstreamQueueAttributeList[index].Weight,
1367 DiscardPolicy: tp.UpstreamQueueAttributeList[index].DiscardPolicy,
1368 DiscardConfig: tp.UpstreamQueueAttributeList[index].DiscardConfig})
1369 }
1370
1371 for index := 0; index < int(tp.NumGemPorts); index++ {
1372 dsQueueAttributeList = append(dsQueueAttributeList,
1373 &tp_pb.EPONQueueAttributes{GemportId: resInst.GemportIds[index],
1374 MaxQSize: tp.DownstreamQueueAttributeList[index].MaxQSize,
1375 PbitMap: tp.DownstreamQueueAttributeList[index].PbitMap,
1376 AesEncryption: tp.DownstreamQueueAttributeList[index].AesEncryption,
1377 SchedulingPolicy: tp.DownstreamQueueAttributeList[index].SchedulingPolicy,
1378 PriorityQ: tp.DownstreamQueueAttributeList[index].PriorityQ,
1379 Weight: tp.DownstreamQueueAttributeList[index].Weight,
1380 DiscardPolicy: tp.DownstreamQueueAttributeList[index].DiscardPolicy,
1381 DiscardConfig: tp.DownstreamQueueAttributeList[index].DiscardConfig})
1382 }
1383
1384 return &tp_pb.EponTechProfileInstance{
1385 SubscriberIdentifier: resInst.SubscriberIdentifier,
1386 Name: tp.Name,
1387 ProfileType: tp.ProfileType,
1388 Version: tp.Version,
1389 NumGemPorts: tp.NumGemPorts,
1390 InstanceControl: tp.InstanceControl,
1391 PackageType: tp.PackageType,
1392 AllocId: resInst.AllocId,
1393 UpstreamQueueAttributeList: usQueueAttributeList,
1394 DownstreamQueueAttributeList: dsQueueAttributeList}
1395}
1396
1397func (t *TechProfileMgr) getTpInstanceFromResourceInstance(ctx context.Context, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
1398 if resInst == nil {
1399 logger.Error(ctx, "resource-instance-nil")
1400 return nil
1401 }
1402 tp := t.getTPFromKVStore(ctx, resInst.TpId)
1403 if tp == nil {
1404 logger.Warnw(ctx, "tp-not-found-on-kv--creating-default-tp", log.Fields{"tpID": resInst.TpId})
1405 tp = t.getDefaultTechProfile(ctx)
1406 }
1407 return t.buildTpInstanceFromResourceInstance(ctx, tp, resInst)
1408}
1409
1410func (t *TechProfileMgr) getEponTpInstanceFromResourceInstance(ctx context.Context, resInst *tp_pb.ResourceInstance) *tp_pb.EponTechProfileInstance {
1411 if resInst == nil {
1412 logger.Error(ctx, "resource-instance-nil")
1413 return nil
1414 }
1415 eponTp := t.getEponTPFromKVStore(ctx, resInst.TpId)
1416 if eponTp == nil {
1417 logger.Warnw(ctx, "tp-not-found-on-kv--creating-default-tp", log.Fields{"tpID": resInst.TpId})
1418 eponTp = t.getDefaultEponProfile(ctx)
1419 }
1420 return t.buildEponTpInstanceFromResourceInstance(ctx, eponTp, resInst)
1421}
1422
1423func (t *TechProfileMgr) reconcileTpInstancesToCache(ctx context.Context) error {
1424
1425 tech := t.resourceMgr.GetTechnology()
1426 newCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
1427 defer cancel()
1428 kvPairs, _ := t.config.ResourceInstanceKVBacked.List(newCtx, tech)
1429
1430 if tech == xgspon || tech == xgpon || tech == gpon {
1431 for keyPath, kvPair := range kvPairs {
1432 logger.Debugw(ctx, "attempting-to-reconcile-tp-instance-from-resource-instance", log.Fields{"resourceInstPath": keyPath})
1433 if value, err := kvstore.ToByte(kvPair.Value); err == nil {
1434 var resInst tp_pb.ResourceInstance
1435 if err = proto.Unmarshal(value, &resInst); err != nil {
1436 logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"err": err, "keyPath": keyPath, "value": value})
1437 continue
1438 } else {
1439 if tpInst := t.getTpInstanceFromResourceInstance(ctx, &resInst); tpInst != nil {
1440 // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
1441 keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
1442 if len(keySuffixSlice) == 2 {
1443 keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
1444 // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
1445 if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
1446 logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
1447 continue
1448 }
1449 } else {
1450 logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
1451 continue
1452 }
1453 t.tpInstanceMapLock.Lock()
1454 t.tpInstanceMap[keySuffixSlice[1]] = tpInst
1455 t.tpInstanceMapLock.Unlock()
1456 logger.Debugw(ctx, "reconciled-tp-success", log.Fields{"keyPath": keyPath})
1457 }
1458 }
1459 } else {
1460 logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
1461 }
1462 }
1463 } else if tech == epon {
1464 for keyPath, kvPair := range kvPairs {
1465 logger.Debugw(ctx, "attempting-to-reconcile-epon-tp-instance", log.Fields{"keyPath": keyPath})
1466 if value, err := kvstore.ToByte(kvPair.Value); err == nil {
1467 var resInst tp_pb.ResourceInstance
1468 if err = proto.Unmarshal(value, &resInst); err != nil {
1469 logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"keyPath": keyPath, "value": value})
1470 continue
1471 } else {
1472 if eponTpInst := t.getEponTpInstanceFromResourceInstance(ctx, &resInst); eponTpInst != nil {
1473 // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
1474 keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
1475 if len(keySuffixSlice) == 2 {
1476 keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
1477 // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
1478 if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
1479 logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
1480 continue
1481 }
1482 } else {
1483 logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
1484 continue
1485 }
1486 t.epontpInstanceMapLock.Lock()
1487 t.eponTpInstanceMap[keySuffixSlice[1]] = eponTpInst
1488 t.epontpInstanceMapLock.Unlock()
1489 logger.Debugw(ctx, "reconciled-epon-tp-success", log.Fields{"keyPath": keyPath})
1490 }
1491 }
1492 } else {
1493 logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
1494 }
1495 }
1496 } else {
1497 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
1498 return fmt.Errorf("unknown-tech-%v", tech)
1499 }
1500
1501 return nil
1502}