blob: 32f73b9c62fa50851a96503abc4eb97dda84494f [file] [log] [blame]
khenaidoo106c61a2021-08-11 18:05:46 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package techprofile
18
19import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "regexp"
25 "strconv"
26 "sync"
27 "time"
28
29 "github.com/golang/protobuf/jsonpb"
30 "github.com/golang/protobuf/proto"
31 "github.com/opencord/voltha-lib-go/v7/pkg/db"
32 "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
33 "github.com/opencord/voltha-lib-go/v7/pkg/log"
34 "github.com/opencord/voltha-protos/v5/go/openolt"
35 tp_pb "github.com/opencord/voltha-protos/v5/go/tech_profile"
36)
37
38// Interface to pon resource manager APIs
39type iPonResourceMgr interface {
40 GetResourceID(ctx context.Context, intfID uint32, resourceType string, numIDs uint32) ([]uint32, error)
41 FreeResourceID(ctx context.Context, intfID uint32, resourceType string, ReleaseContent []uint32) error
42 GetResourceTypeAllocID() string
43 GetResourceTypeGemPortID() string
44 GetResourceTypeOnuID() string
45 GetTechnology() string
46}
47
48type SchedulingPolicy int32
49
50const (
51 SchedulingPolicy_WRR SchedulingPolicy = 0
52 SchedulingPolicy_StrictPriority SchedulingPolicy = 1
53 SchedulingPolicy_Hybrid SchedulingPolicy = 2
54)
55
56type AdditionalBW int32
57
58const (
59 AdditionalBW_AdditionalBW_None AdditionalBW = 0
60 AdditionalBW_AdditionalBW_NA AdditionalBW = 1
61 AdditionalBW_AdditionalBW_BestEffort AdditionalBW = 2
62 AdditionalBW_AdditionalBW_Auto AdditionalBW = 3
63)
64
65type DiscardPolicy int32
66
67const (
68 DiscardPolicy_TailDrop DiscardPolicy = 0
69 DiscardPolicy_WTailDrop DiscardPolicy = 1
70 DiscardPolicy_Red DiscardPolicy = 2
71 DiscardPolicy_WRed DiscardPolicy = 3
72)
73
74// Required uniPortName format
75var uniPortNameFormatRegexp = regexp.MustCompile(`^olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
76
77// instance control defaults
78const (
79 defaultOnuInstance = "multi-instance"
80 defaultUniInstance = "single-instance"
81 defaultGemPayloadSize = "auto"
82)
83
84// default discard config constants
85const (
86 defaultMinThreshold = 0
87 defaultMaxThreshold = 0
88 defaultMaxProbability = 0
89)
90
91// default scheduler contants
92const (
93 defaultPriority = 0
94 defaultWeight = 0
95)
96
97// default GEM attribute constants
98const (
99 defaultAESEncryption = "True"
100 defaultPriorityQueue = 0
101 defaultQueueWeight = 0
102 defaultMaxQueueSize = "auto"
103 defaultIsMulticast = "False"
104 defaultAccessControlList = "224.0.0.0-239.255.255.255"
105 defaultMcastGemID = 4069
106)
107
108// Default EPON constants
109const (
110 defaultPakageType = "B"
111)
112const (
113 defaultTrafficType = "BE"
114 defaultUnsolicitedGrantSize = 0
115 defaultNominalInterval = 0
116 defaultToleratedPollJitter = 0
117 defaultRequestTransmissionPolicy = 0
118 defaultNumQueueSet = 2
119)
120const (
121 defaultQThreshold1 = 5500
122 defaultQThreshold2 = 0
123 defaultQThreshold3 = 0
124 defaultQThreshold4 = 0
125 defaultQThreshold5 = 0
126 defaultQThreshold6 = 0
127 defaultQThreshold7 = 0
128)
129
130const (
131 xgspon = "XGS-PON"
132 xgpon = "XGPON"
133 gpon = "GPON"
134 epon = "EPON"
135)
136
137const (
138 MaxUniPortPerOnu = 16 // TODO: Adapter uses its own constant for MaxUniPort. How to synchronize this and have a single source of truth?
139)
140
141type TechProfileMgr struct {
142 config *TechProfileFlags
143 resourceMgr iPonResourceMgr
144 OnuIDMgmtLock sync.RWMutex
145 GemPortIDMgmtLock sync.RWMutex
146 AllocIDMgmtLock sync.RWMutex
147 tpInstanceMap map[string]*tp_pb.TechProfileInstance // Map of tp path to tp instance
148 tpInstanceMapLock sync.RWMutex
149 eponTpInstanceMap map[string]*tp_pb.EponTechProfileInstance // Map of tp path to epon tp instance
150 epontpInstanceMapLock sync.RWMutex
151 tpMap map[uint32]*tp_pb.TechProfile // Map of tp id to tp
152 tpMapLock sync.RWMutex
153 eponTpMap map[uint32]*tp_pb.EponTechProfile // map of tp id to epon tp
154 eponTpMapLock sync.RWMutex
155}
156
157func (t *TechProfileMgr) SetKVClient(ctx context.Context, pathPrefix string) *db.Backend {
158 kvClient, err := newKVClient(ctx, t.config.KVStoreType, t.config.KVStoreAddress, t.config.KVStoreTimeout)
159 if err != nil {
160 logger.Errorw(ctx, "failed-to-create-kv-client",
161 log.Fields{
162 "type": t.config.KVStoreType, "address": t.config.KVStoreAddress,
163 "timeout": t.config.KVStoreTimeout, "prefix": pathPrefix,
164 "error": err.Error(),
165 })
166 return nil
167 }
168 return &db.Backend{
169 Client: kvClient,
170 StoreType: t.config.KVStoreType,
171 Address: t.config.KVStoreAddress,
172 Timeout: t.config.KVStoreTimeout,
173 PathPrefix: pathPrefix}
174
175 /* TODO : Make sure direct call to NewBackend is working fine with backend , currently there is some
176 issue between kv store and backend , core is not calling NewBackend directly
177 kv := model.NewBackend(t.config.kvStoreType, t.config.KVStoreHost, t.config.KVStorePort,
178 t.config.KVStoreTimeout, kvStoreTechProfilePathPrefix)
179 */
180}
181
182func NewTechProfile(ctx context.Context, resourceMgr iPonResourceMgr, kvStoreType string, kvStoreAddress string, basePathKvStore string) (*TechProfileMgr, error) {
183 var techprofileObj TechProfileMgr
184 logger.Debug(ctx, "initializing-techprofile-mananger")
185 techprofileObj.config = NewTechProfileFlags(kvStoreType, kvStoreAddress, basePathKvStore)
186 techprofileObj.config.KVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.TPKVPathPrefix)
187 techprofileObj.config.DefaultTpKVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.defaultTpKvPathPrefix)
188 if techprofileObj.config.KVBackend == nil {
189 logger.Error(ctx, "failed-to-initialize-backend")
190 return nil, errors.New("kv-backend-init-failed")
191 }
192 techprofileObj.config.ResourceInstanceKVBacked = techprofileObj.SetKVClient(ctx, techprofileObj.config.ResourceInstanceKVPathPrefix)
193 if techprofileObj.config.ResourceInstanceKVBacked == nil {
194 logger.Error(ctx, "failed-to-initialize-resource-instance-kv-backend")
195 return nil, errors.New("resource-instance-kv-backend-init-failed")
196 }
197 techprofileObj.resourceMgr = resourceMgr
198 techprofileObj.tpInstanceMap = make(map[string]*tp_pb.TechProfileInstance)
199 techprofileObj.eponTpInstanceMap = make(map[string]*tp_pb.EponTechProfileInstance)
200 techprofileObj.tpMap = make(map[uint32]*tp_pb.TechProfile)
201 techprofileObj.eponTpMap = make(map[uint32]*tp_pb.EponTechProfile)
202 logger.Debug(ctx, "reconcile-tp-instance-cache-start")
203 if err := techprofileObj.reconcileTpInstancesToCache(ctx); err != nil {
204 logger.Errorw(ctx, "failed-to-reconcile-tp-instances", log.Fields{"err": err})
205 return nil, err
206 }
207 logger.Debug(ctx, "reconcile-tp-instance-cache-end")
208 logger.Debug(ctx, "initializing-tech-profile-manager-object-success")
209 return &techprofileObj, nil
210}
211
212// GetTechProfileInstanceKey returns the tp instance key that is used to reference TP Instance Map
213func (t *TechProfileMgr) GetTechProfileInstanceKey(ctx context.Context, tpID uint32, uniPortName string) string {
214 logger.Debugw(ctx, "get-tp-instance-kv-key", log.Fields{
215 "uniPortName": uniPortName,
216 "tpId": tpID,
217 })
218 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
219 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
220 logger.Warnw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
221 }
222 // The key path prefix (like service/voltha/technology_profiles or service/voltha_voltha/technology_profiles)
223 // is expected to be attached by the components that use this path as part of the KVBackend configuration.
224 resourceInstanceKvPathSuffix := "%s/%d/%s" // <technology>/<tpID>/<uni-port-name>
225 // <uni-port-name> must be of the format pon-{\d+}/onu-{\d+}/uni-{\d+}
226 return fmt.Sprintf(resourceInstanceKvPathSuffix, t.resourceMgr.GetTechnology(), tpID, uniPortName)
227}
228
229// GetTPInstance gets TP instance from cache if found
230func (t *TechProfileMgr) GetTPInstance(ctx context.Context, path string) (interface{}, error) {
231 tech := t.resourceMgr.GetTechnology()
232 switch tech {
233 case xgspon, xgpon, gpon:
234 t.tpInstanceMapLock.RLock()
235 defer t.tpInstanceMapLock.RUnlock()
236 tpInst, ok := t.tpInstanceMap[path]
237 if !ok {
238 return nil, fmt.Errorf("tp-instance-not-found-tp-path-%v", path)
239 }
240 return tpInst, nil
241 case epon:
242 t.epontpInstanceMapLock.RLock()
243 defer t.epontpInstanceMapLock.RUnlock()
244 tpInst, ok := t.eponTpInstanceMap[path]
245 if !ok {
246 return nil, fmt.Errorf("tp-instance-not-found-tp-path-%v", path)
247 }
248 return tpInst, nil
249 default:
250 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
251 return nil, fmt.Errorf("unknown-tech-%s-tp-path-%v", tech, path)
252 }
253}
254
255// CreateTechProfileInstance creates a new TP instance.
256func (t *TechProfileMgr) CreateTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string, intfID uint32) (interface{}, error) {
257 var tpInstance *tp_pb.TechProfileInstance
258 var eponTpInstance *tp_pb.EponTechProfileInstance
259
260 logger.Infow(ctx, "creating-tp-instance", log.Fields{"tpID": tpID, "uni": uniPortName, "intId": intfID})
261
262 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
263 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
264 logger.Errorw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
265 return nil, fmt.Errorf("uni-port-name-not-confirming-to-format-%s", uniPortName)
266 }
267 tpInstancePathSuffix := t.GetTechProfileInstanceKey(ctx, tpID, uniPortName)
268
269 if t.resourceMgr.GetTechnology() == epon {
270 tp := t.getEponTPFromKVStore(ctx, tpID)
271 if tp != nil {
272 if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
273 logger.Error(ctx, "invalid-instance-ctrl-attr-using-default-tp")
274 tp = t.getDefaultEponProfile(ctx)
275 } else {
276 logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpID": tpID})
277 }
278 } else {
279 logger.Info(ctx, "tp-not-found-on-kv--creating-default-tp")
280 tp = t.getDefaultEponProfile(ctx)
281 }
282 // Store TP in cache
283 t.eponTpMapLock.Lock()
284 t.eponTpMap[tpID] = tp
285 t.eponTpMapLock.Unlock()
286
287 if eponTpInstance = t.allocateEponTPInstance(ctx, uniPortName, tp, intfID, tpInstancePathSuffix); eponTpInstance == nil {
288 logger.Error(ctx, "tp-instance-allocation-failed")
289 return nil, errors.New("tp-instance-allocation-failed")
290 }
291 t.epontpInstanceMapLock.Lock()
292 t.eponTpInstanceMap[tpInstancePathSuffix] = eponTpInstance
293 t.epontpInstanceMapLock.Unlock()
294 resInst := tp_pb.ResourceInstance{
295 TpId: tpID,
296 ProfileType: eponTpInstance.ProfileType,
297 SubscriberIdentifier: eponTpInstance.SubscriberIdentifier,
298 AllocId: eponTpInstance.AllocId,
299 }
300 for _, usQAttr := range eponTpInstance.UpstreamQueueAttributeList {
301 resInst.GemportIds = append(resInst.GemportIds, usQAttr.GemportId)
302 }
303
304 logger.Infow(ctx, "epon-tp-instance-created-successfully",
305 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
306 if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
307 logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
308 allocIDs := make([]uint32, 0)
309 allocIDs = append(allocIDs, resInst.AllocId)
310 errList := make([]error, 0)
311 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), allocIDs))
312 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), resInst.GemportIds))
313 if len(errList) > 0 {
314 logger.Errorw(ctx, "failed-to-free-up-resources-on-kv-store--system-behavior-has-become-erratic", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "errList": errList})
315 }
316 return nil, err
317 }
318 return eponTpInstance, nil
319 } else {
320 tp := t.getTPFromKVStore(ctx, tpID)
321 if tp != nil {
322 if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
323 logger.Error(ctx, "invalid-instance-ctrl-attr--using-default-tp")
324 tp = t.getDefaultTechProfile(ctx)
325 } else {
326 logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpID": tpID})
327 }
328 } else {
329 logger.Info(ctx, "tp-not-found-on-kv--creating-default-tp")
330 tp = t.getDefaultTechProfile(ctx)
331 }
332 // Store TP in cache
333 t.tpMapLock.Lock()
334 t.tpMap[tpID] = tp
335 t.tpMapLock.Unlock()
336
337 if tpInstance = t.allocateTPInstance(ctx, uniPortName, tp, intfID, tpInstancePathSuffix); tpInstance == nil {
338 logger.Error(ctx, "tp-instance-allocation-failed")
339 return nil, errors.New("tp-instance-allocation-failed")
340 }
341 t.tpInstanceMapLock.Lock()
342 t.tpInstanceMap[tpInstancePathSuffix] = tpInstance
343 t.tpInstanceMapLock.Unlock()
344
345 resInst := tp_pb.ResourceInstance{
346 TpId: tpID,
347 ProfileType: tpInstance.ProfileType,
348 SubscriberIdentifier: tpInstance.SubscriberIdentifier,
349 AllocId: tpInstance.UsScheduler.AllocId,
350 }
351 for _, usQAttr := range tpInstance.UpstreamGemPortAttributeList {
352 resInst.GemportIds = append(resInst.GemportIds, usQAttr.GemportId)
353 }
354
355 logger.Infow(ctx, "tp-instance-created-successfully",
356 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
357 if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
358 logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
359 allocIDs := make([]uint32, 0)
360 allocIDs = append(allocIDs, resInst.AllocId)
361 errList := make([]error, 0)
362 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), allocIDs))
363 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), resInst.GemportIds))
364 if len(errList) > 0 {
365 logger.Fatalw(ctx, "failed-to-free-up-resources-on-kv-store--system-behavior-has-become-erratic", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
366 }
367 return nil, err
368 }
369
370 logger.Infow(ctx, "resource-instance-added-to-kv-store-successfully",
371 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
372 return tpInstance, nil
373 }
374}
375
376// DeleteTechProfileInstance deletes the TP instance from the local cache as well as deletes the corresponding
377// resource instance from the KV store.
378func (t *TechProfileMgr) DeleteTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string) error {
379 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
380 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
381 logger.Errorw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
382 return fmt.Errorf("uni-port-name-not-confirming-to-format--%s", uniPortName)
383 }
384 path := t.GetTechProfileInstanceKey(ctx, tpID, uniPortName)
385 logger.Infow(ctx, "delete-tp-instance-from-cache", log.Fields{"key": path})
386 t.tpInstanceMapLock.Lock()
387 delete(t.tpInstanceMap, path)
388 t.tpInstanceMapLock.Unlock()
389 if err := t.removeResourceInstanceFromKVStore(ctx, tpID, uniPortName); err != nil {
390 return err
391 }
392 return nil
393}
394
395func (t *TechProfileMgr) GetMulticastTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance) []*tp_pb.TrafficQueue {
396 var encryp bool
397 NumGemPorts := len(tp.DownstreamGemPortAttributeList)
398 mcastTrafficQueues := make([]*tp_pb.TrafficQueue, 0)
399 for Count := 0; Count < NumGemPorts; Count++ {
400 if !isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
401 continue
402 }
403 if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
404 encryp = true
405 } else {
406 encryp = false
407 }
408 mcastTrafficQueues = append(mcastTrafficQueues, &tp_pb.TrafficQueue{
409 Direction: tp_pb.Direction_DOWNSTREAM,
410 GemportId: tp.DownstreamGemPortAttributeList[Count].MulticastGemId,
411 PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
412 AesEncryption: encryp,
413 SchedPolicy: tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy,
414 Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQ,
415 Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
416 DiscardPolicy: tp.DownstreamGemPortAttributeList[Count].DiscardPolicy,
417 })
418 }
419 logger.Debugw(ctx, "Downstream Multicast Traffic queue list ", log.Fields{"queuelist": mcastTrafficQueues})
420 return mcastTrafficQueues
421}
422
423func (t *TechProfileMgr) GetGemportForPbit(ctx context.Context, tp interface{}, dir tp_pb.Direction, pbit uint32) interface{} {
424 /*
425 Function to get the Gemport mapped to a pbit.
426 */
427 switch tp := tp.(type) {
428 case *tp_pb.TechProfileInstance:
429 if dir == tp_pb.Direction_UPSTREAM {
430 // upstream GEM ports
431 numGemPorts := len(tp.UpstreamGemPortAttributeList)
432 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
433 lenOfPbitMap := len(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap)
434 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
435 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
436 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
437 if p, err := strconv.Atoi(string(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
438 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
439 logger.Debugw(ctx, "Found-US-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.UpstreamGemPortAttributeList[gemCnt].GemportId})
440 return tp.UpstreamGemPortAttributeList[gemCnt]
441 }
442 }
443 }
444 }
445 } else if dir == tp_pb.Direction_DOWNSTREAM {
446 //downstream GEM ports
447 numGemPorts := len(tp.DownstreamGemPortAttributeList)
448 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
449 lenOfPbitMap := len(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap)
450 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
451 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
452 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
453 if p, err := strconv.Atoi(string(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
454 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
455 logger.Debugw(ctx, "Found-DS-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.DownstreamGemPortAttributeList[gemCnt].GemportId})
456 return tp.DownstreamGemPortAttributeList[gemCnt]
457 }
458 }
459 }
460 }
461 }
462 logger.Errorw(ctx, "No-GemportId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
463 case *openolt.EponTechProfileInstance:
464 if dir == tp_pb.Direction_UPSTREAM {
465 // upstream GEM ports
466 numGemPorts := len(tp.UpstreamQueueAttributeList)
467 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
468 lenOfPbitMap := len(tp.UpstreamQueueAttributeList[gemCnt].PbitMap)
469 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
470 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
471 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
472 if p, err := strconv.Atoi(string(tp.UpstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
473 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
474 logger.Debugw(ctx, "Found-US-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.UpstreamQueueAttributeList[gemCnt].GemportId})
475 return tp.UpstreamQueueAttributeList[gemCnt]
476 }
477 }
478 }
479 }
480 } else if dir == tp_pb.Direction_DOWNSTREAM {
481 //downstream GEM ports
482 numGemPorts := len(tp.DownstreamQueueAttributeList)
483 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
484 lenOfPbitMap := len(tp.DownstreamQueueAttributeList[gemCnt].PbitMap)
485 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
486 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
487 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
488 if p, err := strconv.Atoi(string(tp.DownstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
489 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
490 logger.Debugw(ctx, "Found-DS-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.DownstreamQueueAttributeList[gemCnt].GemportId})
491 return tp.DownstreamQueueAttributeList[gemCnt]
492 }
493 }
494 }
495 }
496 }
497 logger.Errorw(ctx, "No-QueueId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
498 default:
499 logger.Errorw(ctx, "unknown-tech", log.Fields{"tp": tp})
500 }
501 return nil
502}
503
504// FindAllTpInstances returns all TechProfile instances for a given TechProfile table-id, pon interface ID and onu ID.
505func (t *TechProfileMgr) FindAllTpInstances(ctx context.Context, oltDeviceID string, tpID uint32, intfID uint32, onuID uint32) interface{} {
506 onuTpInstancePathSuffix := fmt.Sprintf("%s/%d/olt-{%s}/pon-{%d}/onu-{%d}", t.resourceMgr.GetTechnology(), tpID, oltDeviceID, intfID, onuID)
507 tech := t.resourceMgr.GetTechnology()
508 if tech == xgspon || tech == xgpon || tech == gpon {
509 t.tpInstanceMapLock.RLock()
510 defer t.tpInstanceMapLock.RUnlock()
511 tpInstancesTech := make([]tp_pb.TechProfileInstance, 0)
512 for i := 0; i < MaxUniPortPerOnu; i++ {
513 key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
514 if tpInst, ok := t.tpInstanceMap[key]; ok {
515 tpInstancesTech = append(tpInstancesTech, *tpInst)
516 }
517 }
518 return tpInstancesTech
519 } else if tech == epon {
520 t.epontpInstanceMapLock.RLock()
521 defer t.epontpInstanceMapLock.RUnlock()
522 tpInstancesTech := make([]tp_pb.EponTechProfileInstance, 0)
523 for i := 0; i < MaxUniPortPerOnu; i++ {
524 key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
525 if tpInst, ok := t.eponTpInstanceMap[key]; ok {
526 tpInstancesTech = append(tpInstancesTech, *tpInst)
527 }
528 }
529 return tpInstancesTech
530 } else {
531 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech, "tpID": tpID, "onuID": onuID, "intfID": intfID})
532 }
533 return nil
534}
535
536func (t *TechProfileMgr) GetResourceID(ctx context.Context, intfID uint32, resourceType string, numIDs uint32) ([]uint32, error) {
537 logger.Debugw(ctx, "getting-resource-id", log.Fields{
538 "intf-id": intfID,
539 "resource-type": resourceType,
540 "num": numIDs,
541 })
542 var err error
543 var ids []uint32
544 switch resourceType {
545 case t.resourceMgr.GetResourceTypeAllocID():
546 t.AllocIDMgmtLock.Lock()
547 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
548 t.AllocIDMgmtLock.Unlock()
549 case t.resourceMgr.GetResourceTypeGemPortID():
550 t.GemPortIDMgmtLock.Lock()
551 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
552 t.GemPortIDMgmtLock.Unlock()
553 case t.resourceMgr.GetResourceTypeOnuID():
554 t.OnuIDMgmtLock.Lock()
555 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
556 t.OnuIDMgmtLock.Unlock()
557 default:
558 return nil, fmt.Errorf("resourceType %s not supported", resourceType)
559 }
560 if err != nil {
561 return nil, err
562 }
563 return ids, nil
564}
565
566func (t *TechProfileMgr) FreeResourceID(ctx context.Context, intfID uint32, resourceType string, ReleaseContent []uint32) error {
567 logger.Debugw(ctx, "freeing-resource-id", log.Fields{
568 "intf-id": intfID,
569 "resource-type": resourceType,
570 "release-content": ReleaseContent,
571 })
572 var err error
573 switch resourceType {
574 case t.resourceMgr.GetResourceTypeAllocID():
575 t.AllocIDMgmtLock.Lock()
576 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
577 t.AllocIDMgmtLock.Unlock()
578 case t.resourceMgr.GetResourceTypeGemPortID():
579 t.GemPortIDMgmtLock.Lock()
580 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
581 t.GemPortIDMgmtLock.Unlock()
582 case t.resourceMgr.GetResourceTypeOnuID():
583 t.OnuIDMgmtLock.Lock()
584 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
585 t.OnuIDMgmtLock.Unlock()
586 default:
587 return fmt.Errorf("resourceType %s not supported", resourceType)
588 }
589 if err != nil {
590 return err
591 }
592 return nil
593}
594
595func (t *TechProfileMgr) GetUsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
596 return &tp_pb.SchedulerConfig{
597 Direction: tpInstance.UsScheduler.Direction,
598 AdditionalBw: tpInstance.UsScheduler.AdditionalBw,
599 Priority: tpInstance.UsScheduler.Priority,
600 Weight: tpInstance.UsScheduler.Weight,
601 SchedPolicy: tpInstance.UsScheduler.QSchedPolicy}
602}
603
604func (t *TechProfileMgr) GetDsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
605 return &tp_pb.SchedulerConfig{
606 Direction: tpInstance.DsScheduler.Direction,
607 AdditionalBw: tpInstance.DsScheduler.AdditionalBw,
608 Priority: tpInstance.DsScheduler.Priority,
609 Weight: tpInstance.DsScheduler.Weight,
610 SchedPolicy: tpInstance.DsScheduler.QSchedPolicy}
611}
612
613func (t *TechProfileMgr) GetTrafficScheduler(tpInstance *tp_pb.TechProfileInstance, SchedCfg *tp_pb.SchedulerConfig,
614 ShapingCfg *tp_pb.TrafficShapingInfo) *tp_pb.TrafficScheduler {
615
616 tSched := &tp_pb.TrafficScheduler{
617 Direction: SchedCfg.Direction,
618 AllocId: tpInstance.UsScheduler.AllocId,
619 TrafficShapingInfo: ShapingCfg,
620 Scheduler: SchedCfg}
621
622 return tSched
623}
624
625func (t *TechProfileMgr) GetTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance, direction tp_pb.Direction) ([]*tp_pb.TrafficQueue, error) {
626
627 var encryp bool
628 if direction == tp_pb.Direction_UPSTREAM {
629 // upstream GEM ports
630 NumGemPorts := len(tp.UpstreamGemPortAttributeList)
631 GemPorts := make([]*tp_pb.TrafficQueue, 0)
632 for Count := 0; Count < NumGemPorts; Count++ {
633 if tp.UpstreamGemPortAttributeList[Count].AesEncryption == "True" {
634 encryp = true
635 } else {
636 encryp = false
637 }
638
639 GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
640 Direction: direction,
641 GemportId: tp.UpstreamGemPortAttributeList[Count].GemportId,
642 PbitMap: tp.UpstreamGemPortAttributeList[Count].PbitMap,
643 AesEncryption: encryp,
644 SchedPolicy: tp.UpstreamGemPortAttributeList[Count].SchedulingPolicy,
645 Priority: tp.UpstreamGemPortAttributeList[Count].PriorityQ,
646 Weight: tp.UpstreamGemPortAttributeList[Count].Weight,
647 DiscardPolicy: tp.UpstreamGemPortAttributeList[Count].DiscardPolicy,
648 })
649 }
650 logger.Debugw(ctx, "Upstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
651 return GemPorts, nil
652 } else if direction == tp_pb.Direction_DOWNSTREAM {
653 //downstream GEM ports
654 NumGemPorts := len(tp.DownstreamGemPortAttributeList)
655 GemPorts := make([]*tp_pb.TrafficQueue, 0)
656 for Count := 0; Count < NumGemPorts; Count++ {
657 if isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
658 //do not take multicast GEM ports. They are handled separately.
659 continue
660 }
661 if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
662 encryp = true
663 } else {
664 encryp = false
665 }
666
667 GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
668 Direction: direction,
669 GemportId: tp.DownstreamGemPortAttributeList[Count].GemportId,
670 PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
671 AesEncryption: encryp,
672 SchedPolicy: tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy,
673 Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQ,
674 Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
675 DiscardPolicy: tp.DownstreamGemPortAttributeList[Count].DiscardPolicy,
676 })
677 }
678 logger.Debugw(ctx, "Downstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
679 return GemPorts, nil
680 }
681
682 logger.Errorf(ctx, "Unsupported direction %s used for generating Traffic Queue list", direction)
683 return nil, fmt.Errorf("downstream gem port traffic queue creation failed due to unsupported direction %s", direction)
684}
685
686func (t *TechProfileMgr) validateInstanceControlAttr(ctx context.Context, instCtl tp_pb.InstanceControl) error {
687 if instCtl.Onu != "single-instance" && instCtl.Onu != "multi-instance" {
688 logger.Errorw(ctx, "invalid-onu-instance-control-attribute", log.Fields{"onu-inst": instCtl.Onu})
689 return errors.New("invalid-onu-instance-ctl-attr")
690 }
691
692 if instCtl.Uni != "single-instance" && instCtl.Uni != "multi-instance" {
693 logger.Errorw(ctx, "invalid-uni-instance-control-attribute", log.Fields{"uni-inst": instCtl.Uni})
694 return errors.New("invalid-uni-instance-ctl-attr")
695 }
696
697 if instCtl.Uni == "multi-instance" {
698 logger.Error(ctx, "uni-multi-instance-tp-not-supported")
699 return errors.New("uni-multi-instance-tp-not-supported")
700 }
701
702 return nil
703}
704
705// allocateTPInstance for GPON, XGPON and XGS-PON technology
706func (t *TechProfileMgr) allocateTPInstance(ctx context.Context, uniPortName string, tp *tp_pb.TechProfile, intfID uint32, tpInstPathSuffix string) *tp_pb.TechProfileInstance {
707
708 var usGemPortAttributeList []*tp_pb.GemPortAttributes
709 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
710 var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
711 var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
712 var tcontIDs []uint32
713 var gemPorts []uint32
714 var err error
715
716 logger.Infow(ctx, "Allocating TechProfileMgr instance from techprofile template", log.Fields{"uniPortName": uniPortName, "intfID": intfID, "numGem": tp.NumGemPorts})
717
718 if tp.InstanceControl.Onu == "multi-instance" {
719 tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1)
720 if err != nil {
721 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
722 return nil
723 }
724 } else { // "single-instance"
725 if tpInst := t.getSingleInstanceTp(ctx, tpInstPathSuffix); tpInst == nil {
726 // No "single-instance" tp found on one any uni port for the given TP ID
727 // Allocate a new TcontID or AllocID
728 tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1)
729 if err != nil {
730 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
731 return nil
732 }
733 } else {
734 // Use the alloc-id from the existing TpInstance
735 tcontIDs = append(tcontIDs, tpInst.UsScheduler.AllocId)
736 }
737 }
738 logger.Debugw(ctx, "Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
739 gemPorts, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts)
740 if err != nil {
741 logger.Errorw(ctx, "Error getting gemport ids from rsrcrMgr", log.Fields{"err": err, "intfID": intfID, "numGemports": tp.NumGemPorts})
742 return nil
743 }
744 logger.Infow(ctx, "Allocated tconts and GEM ports successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
745 for index := 0; index < int(tp.NumGemPorts); index++ {
746 usGemPortAttributeList = append(usGemPortAttributeList,
747 &tp_pb.GemPortAttributes{GemportId: gemPorts[index],
748 MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
749 PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
750 AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
751 SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
752 PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
753 Weight: tp.UpstreamGemPortAttributeList[index].Weight,
754 DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
755 DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
756 }
757
758 logger.Info(ctx, "length of DownstreamGemPortAttributeList", len(tp.DownstreamGemPortAttributeList))
759 //put multicast and unicast downstream GEM port attributes in different lists first
760 for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
761 if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
762 dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
763 &tp_pb.GemPortAttributes{
764 MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
765 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
766 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
767 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
768 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
769 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
770 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
771 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
772 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
773 IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
774 DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
775 StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
776 } else {
777 dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
778 &tp_pb.GemPortAttributes{
779 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
780 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
781 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
782 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
783 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
784 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
785 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
786 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
787 }
788 }
789 //add unicast downstream GEM ports to dsGemPortAttributeList
790 if dsUnicastGemAttributeList != nil {
791 for index := 0; index < int(tp.NumGemPorts); index++ {
792 dsGemPortAttributeList = append(dsGemPortAttributeList,
793 &tp_pb.GemPortAttributes{GemportId: gemPorts[index],
794 MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
795 PbitMap: dsUnicastGemAttributeList[index].PbitMap,
796 AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
797 SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
798 PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
799 Weight: dsUnicastGemAttributeList[index].Weight,
800 DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
801 DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
802 }
803 }
804 //add multicast GEM ports to dsGemPortAttributeList afterwards
805 for k := range dsMulticastGemAttributeList {
806 dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
807 }
808
809 return &tp_pb.TechProfileInstance{
810 SubscriberIdentifier: uniPortName,
811 Name: tp.Name,
812 ProfileType: tp.ProfileType,
813 Version: tp.Version,
814 NumGemPorts: tp.NumGemPorts,
815 InstanceControl: tp.InstanceControl,
816 UsScheduler: &tp_pb.SchedulerAttributes{
817 AllocId: tcontIDs[0],
818 Direction: tp.UsScheduler.Direction,
819 AdditionalBw: tp.UsScheduler.AdditionalBw,
820 Priority: tp.UsScheduler.Priority,
821 Weight: tp.UsScheduler.Weight,
822 QSchedPolicy: tp.UsScheduler.QSchedPolicy},
823 DsScheduler: &tp_pb.SchedulerAttributes{
824 AllocId: tcontIDs[0],
825 Direction: tp.DsScheduler.Direction,
826 AdditionalBw: tp.DsScheduler.AdditionalBw,
827 Priority: tp.DsScheduler.Priority,
828 Weight: tp.DsScheduler.Weight,
829 QSchedPolicy: tp.DsScheduler.QSchedPolicy},
830 UpstreamGemPortAttributeList: usGemPortAttributeList,
831 DownstreamGemPortAttributeList: dsGemPortAttributeList}
832}
833
834// allocateTPInstance function for EPON
835func (t *TechProfileMgr) allocateEponTPInstance(ctx context.Context, uniPortName string, tp *tp_pb.EponTechProfile, intfID uint32, tpInstPath string) *tp_pb.EponTechProfileInstance {
836
837 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
838 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
839 var tcontIDs []uint32
840 var gemPorts []uint32
841 var err error
842
843 logger.Infow(ctx, "allocating-tp-instance-from-tp-template", log.Fields{"uniPortName": uniPortName, "intfID": intfID, "numGem": tp.NumGemPorts})
844
845 if tp.InstanceControl.Onu == "multi-instance" {
846 if tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
847 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
848 return nil
849 }
850 } else { // "single-instance"
851 if tpInst := t.getSingleInstanceEponTp(ctx, tpInstPath); tpInst == nil {
852 // No "single-instance" tp found on one any uni port for the given TP ID
853 // Allocate a new TcontID or AllocID
854 if tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
855 logger.Errorw(ctx, "error-getting-alloc-id-from-resource-mgr", log.Fields{"err": err, "intfID": intfID})
856 return nil
857 }
858 } else {
859 // Use the alloc-id from the existing TpInstance
860 tcontIDs = append(tcontIDs, tpInst.AllocId)
861 }
862 }
863 logger.Debugw(ctx, "Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
864 if gemPorts, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts); err != nil {
865 logger.Errorw(ctx, "error-getting-gemport-id-from-resource-mgr", log.Fields{"err": err, "intfID": intfID, "numGemports": tp.NumGemPorts})
866 return nil
867 }
868 logger.Infow(ctx, "allocated-alloc-id-and-gemport-successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
869 for index := 0; index < int(tp.NumGemPorts); index++ {
870 usQueueAttributeList = append(usQueueAttributeList,
871 &tp_pb.EPONQueueAttributes{GemportId: gemPorts[index],
872 MaxQSize: tp.UpstreamQueueAttributeList[index].MaxQSize,
873 PbitMap: tp.UpstreamQueueAttributeList[index].PbitMap,
874 AesEncryption: tp.UpstreamQueueAttributeList[index].AesEncryption,
875 TrafficType: tp.UpstreamQueueAttributeList[index].TrafficType,
876 UnsolicitedGrantSize: tp.UpstreamQueueAttributeList[index].UnsolicitedGrantSize,
877 NominalInterval: tp.UpstreamQueueAttributeList[index].NominalInterval,
878 ToleratedPollJitter: tp.UpstreamQueueAttributeList[index].ToleratedPollJitter,
879 RequestTransmissionPolicy: tp.UpstreamQueueAttributeList[index].RequestTransmissionPolicy,
880 NumQSets: tp.UpstreamQueueAttributeList[index].NumQSets,
881 QThresholds: tp.UpstreamQueueAttributeList[index].QThresholds,
882 SchedulingPolicy: tp.UpstreamQueueAttributeList[index].SchedulingPolicy,
883 PriorityQ: tp.UpstreamQueueAttributeList[index].PriorityQ,
884 Weight: tp.UpstreamQueueAttributeList[index].Weight,
885 DiscardPolicy: tp.UpstreamQueueAttributeList[index].DiscardPolicy,
886 DiscardConfig: tp.UpstreamQueueAttributeList[index].DiscardConfig})
887 }
888
889 logger.Info(ctx, "length-of-downstream-gemport-attribute-list", len(tp.DownstreamQueueAttributeList))
890 for index := 0; index < int(tp.NumGemPorts); index++ {
891 dsQueueAttributeList = append(dsQueueAttributeList,
892 &tp_pb.EPONQueueAttributes{GemportId: gemPorts[index],
893 MaxQSize: tp.DownstreamQueueAttributeList[index].MaxQSize,
894 PbitMap: tp.DownstreamQueueAttributeList[index].PbitMap,
895 AesEncryption: tp.DownstreamQueueAttributeList[index].AesEncryption,
896 SchedulingPolicy: tp.DownstreamQueueAttributeList[index].SchedulingPolicy,
897 PriorityQ: tp.DownstreamQueueAttributeList[index].PriorityQ,
898 Weight: tp.DownstreamQueueAttributeList[index].Weight,
899 DiscardPolicy: tp.DownstreamQueueAttributeList[index].DiscardPolicy,
900 DiscardConfig: tp.DownstreamQueueAttributeList[index].DiscardConfig})
901 }
902
903 return &tp_pb.EponTechProfileInstance{
904 SubscriberIdentifier: uniPortName,
905 Name: tp.Name,
906 ProfileType: tp.ProfileType,
907 Version: tp.Version,
908 NumGemPorts: tp.NumGemPorts,
909 InstanceControl: tp.InstanceControl,
910 PackageType: tp.PackageType,
911 AllocId: tcontIDs[0],
912 UpstreamQueueAttributeList: usQueueAttributeList,
913 DownstreamQueueAttributeList: dsQueueAttributeList}
914}
915
916// getSingleInstanceTp returns another TpInstance (GPON, XGPON, XGS-PON) for an ONU on a different
917// uni port for the same TP ID, if it finds one, else nil.
918func (t *TechProfileMgr) getSingleInstanceTp(ctx context.Context, tpPathSuffix string) *tp_pb.TechProfileInstance {
919
920 // For example:
921 // tpPathSuffix like "XGS-PON/64/olt-{1234}/pon-{0}/onu-{1}/uni-{1}"
922 // is broken into ["XGS-PON/64/olt-{1234}/pon-{0}/onu-{1}" ""]
923 uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPathSuffix, 2)
924
925 t.tpInstanceMapLock.RLock()
926 defer t.tpInstanceMapLock.RUnlock()
927 for i := 0; i < MaxUniPortPerOnu; i++ {
928 key := fmt.Sprintf(uniPathSlice[0]+"/uni-{%d}", i)
929 if tpInst, ok := t.tpInstanceMap[key]; ok {
930 logger.Debugw(ctx, "found-single-instance-tp", log.Fields{"key": key})
931 return tpInst
932 }
933 }
934 return nil
935}
936
937// getSingleInstanceTp returns another TpInstance (EPON) for an ONU on a different
938// uni port for the same TP ID, if it finds one, else nil.
939func (t *TechProfileMgr) getSingleInstanceEponTp(ctx context.Context, tpPathSuffix string) *tp_pb.EponTechProfileInstance {
940 // For example:
941 // tpPathSuffix like "EPON/64/olt-{1234}/pon-{0}/onu-{1}/uni-{1}"
942 // is broken into ["EPON/64/-{1234}/pon-{0}/onu-{1}" ""]
943 uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPathSuffix, 2)
944
945 t.epontpInstanceMapLock.RLock()
946 defer t.epontpInstanceMapLock.RUnlock()
947 for i := 0; i < MaxUniPortPerOnu; i++ {
948 key := fmt.Sprintf(uniPathSlice[0]+"/uni-{%d}", i)
949 if tpInst, ok := t.eponTpInstanceMap[key]; ok {
950 logger.Debugw(ctx, "found-single-instance-tp", log.Fields{"key": key})
951 return tpInst
952 }
953 }
954 return nil
955}
956
957// getDefaultTechProfile returns a default TechProfile for GPON, XGPON, XGS-PON
958func (t *TechProfileMgr) getDefaultTechProfile(ctx context.Context) *tp_pb.TechProfile {
959 var usGemPortAttributeList []*tp_pb.GemPortAttributes
960 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
961
962 for _, pbit := range t.config.DefaultPbits {
963 logger.Debugw(ctx, "creating-gem-port-profile-profile", log.Fields{"pbit": pbit})
964 usGemPortAttributeList = append(usGemPortAttributeList,
965 &tp_pb.GemPortAttributes{
966 MaxQSize: defaultMaxQueueSize,
967 PbitMap: pbit,
968 AesEncryption: defaultAESEncryption,
969 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
970 PriorityQ: defaultPriorityQueue,
971 Weight: defaultQueueWeight,
972 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
973 DiscardConfigV2: &tp_pb.DiscardConfig{
974 DiscardPolicy: tp_pb.DiscardPolicy_Red,
975 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
976 RedDiscardConfig: &tp_pb.RedDiscardConfig{
977 MinThreshold: defaultMinThreshold,
978 MaxThreshold: defaultMaxThreshold,
979 MaxProbability: defaultMaxProbability,
980 },
981 },
982 },
983 DiscardConfig: &tp_pb.RedDiscardConfig{
984 MinThreshold: defaultMinThreshold,
985 MaxThreshold: defaultMaxThreshold,
986 MaxProbability: defaultMaxProbability,
987 },
988 })
989 dsGemPortAttributeList = append(dsGemPortAttributeList,
990 &tp_pb.GemPortAttributes{
991 MaxQSize: defaultMaxQueueSize,
992 PbitMap: pbit,
993 AesEncryption: defaultAESEncryption,
994 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
995 PriorityQ: defaultPriorityQueue,
996 Weight: defaultQueueWeight,
997 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
998 DiscardConfigV2: &tp_pb.DiscardConfig{
999 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1000 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1001 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1002 MinThreshold: defaultMinThreshold,
1003 MaxThreshold: defaultMaxThreshold,
1004 MaxProbability: defaultMaxProbability,
1005 },
1006 },
1007 },
1008 DiscardConfig: &tp_pb.RedDiscardConfig{
1009 MinThreshold: defaultMinThreshold,
1010 MaxThreshold: defaultMaxThreshold,
1011 MaxProbability: defaultMaxProbability,
1012 },
1013 IsMulticast: defaultIsMulticast,
1014 DynamicAccessControlList: defaultAccessControlList,
1015 StaticAccessControlList: defaultAccessControlList,
1016 MulticastGemId: defaultMcastGemID})
1017 }
1018 return &tp_pb.TechProfile{
1019 Name: t.config.DefaultTPName,
1020 ProfileType: t.resourceMgr.GetTechnology(),
1021 Version: t.config.TPVersion,
1022 NumGemPorts: uint32(len(usGemPortAttributeList)),
1023 InstanceControl: &tp_pb.InstanceControl{
1024 Onu: defaultOnuInstance,
1025 Uni: defaultUniInstance,
1026 MaxGemPayloadSize: defaultGemPayloadSize},
1027 UsScheduler: &tp_pb.SchedulerAttributes{
1028 Direction: tp_pb.Direction_UPSTREAM,
1029 AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
1030 Priority: defaultPriority,
1031 Weight: defaultWeight,
1032 QSchedPolicy: tp_pb.SchedulingPolicy_Hybrid},
1033 DsScheduler: &tp_pb.SchedulerAttributes{
1034 Direction: tp_pb.Direction_DOWNSTREAM,
1035 AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
1036 Priority: defaultPriority,
1037 Weight: defaultWeight,
1038 QSchedPolicy: tp_pb.SchedulingPolicy_Hybrid},
1039 UpstreamGemPortAttributeList: usGemPortAttributeList,
1040 DownstreamGemPortAttributeList: dsGemPortAttributeList}
1041}
1042
1043// getDefaultEponProfile returns a default TechProfile for EPON
1044func (t *TechProfileMgr) getDefaultEponProfile(ctx context.Context) *tp_pb.EponTechProfile {
1045
1046 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
1047 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
1048
1049 for _, pbit := range t.config.DefaultPbits {
1050 logger.Debugw(ctx, "Creating Queue", log.Fields{"pbit": pbit})
1051 usQueueAttributeList = append(usQueueAttributeList,
1052 &tp_pb.EPONQueueAttributes{
1053 MaxQSize: defaultMaxQueueSize,
1054 PbitMap: pbit,
1055 AesEncryption: defaultAESEncryption,
1056 TrafficType: defaultTrafficType,
1057 UnsolicitedGrantSize: defaultUnsolicitedGrantSize,
1058 NominalInterval: defaultNominalInterval,
1059 ToleratedPollJitter: defaultToleratedPollJitter,
1060 RequestTransmissionPolicy: defaultRequestTransmissionPolicy,
1061 NumQSets: defaultNumQueueSet,
1062 QThresholds: &tp_pb.QThresholds{
1063 QThreshold1: defaultQThreshold1,
1064 QThreshold2: defaultQThreshold2,
1065 QThreshold3: defaultQThreshold3,
1066 QThreshold4: defaultQThreshold4,
1067 QThreshold5: defaultQThreshold5,
1068 QThreshold6: defaultQThreshold6,
1069 QThreshold7: defaultQThreshold7},
1070 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
1071 PriorityQ: defaultPriorityQueue,
1072 Weight: defaultQueueWeight,
1073 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
1074 DiscardConfigV2: &tp_pb.DiscardConfig{
1075 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1076 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1077 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1078 MinThreshold: defaultMinThreshold,
1079 MaxThreshold: defaultMaxThreshold,
1080 MaxProbability: defaultMaxProbability,
1081 },
1082 },
1083 },
1084 DiscardConfig: &tp_pb.RedDiscardConfig{
1085 MinThreshold: defaultMinThreshold,
1086 MaxThreshold: defaultMaxThreshold,
1087 MaxProbability: defaultMaxProbability,
1088 }})
1089 dsQueueAttributeList = append(dsQueueAttributeList,
1090 &tp_pb.EPONQueueAttributes{
1091 MaxQSize: defaultMaxQueueSize,
1092 PbitMap: pbit,
1093 AesEncryption: defaultAESEncryption,
1094 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
1095 PriorityQ: defaultPriorityQueue,
1096 Weight: defaultQueueWeight,
1097 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
1098 DiscardConfigV2: &tp_pb.DiscardConfig{
1099 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1100 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1101 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1102 MinThreshold: defaultMinThreshold,
1103 MaxThreshold: defaultMaxThreshold,
1104 MaxProbability: defaultMaxProbability,
1105 },
1106 },
1107 },
1108 DiscardConfig: &tp_pb.RedDiscardConfig{
1109 MinThreshold: defaultMinThreshold,
1110 MaxThreshold: defaultMaxThreshold,
1111 MaxProbability: defaultMaxProbability,
1112 }})
1113 }
1114 return &tp_pb.EponTechProfile{
1115 Name: t.config.DefaultTPName,
1116 ProfileType: t.resourceMgr.GetTechnology(),
1117 Version: t.config.TPVersion,
1118 NumGemPorts: uint32(len(usQueueAttributeList)),
1119 InstanceControl: &tp_pb.InstanceControl{
1120 Onu: defaultOnuInstance,
1121 Uni: defaultUniInstance,
1122 MaxGemPayloadSize: defaultGemPayloadSize},
1123 PackageType: defaultPakageType,
1124 UpstreamQueueAttributeList: usQueueAttributeList,
1125 DownstreamQueueAttributeList: dsQueueAttributeList}
1126}
1127
1128//isMulticastGem returns true if isMulticast attribute value of a GEM port is true; false otherwise
1129func isMulticastGem(isMulticastAttrValue string) bool {
1130 return isMulticastAttrValue != "" &&
1131 (isMulticastAttrValue == "True" || isMulticastAttrValue == "true" || isMulticastAttrValue == "TRUE")
1132}
1133
1134func (t *TechProfileMgr) addResourceInstanceToKVStore(ctx context.Context, tpID uint32, uniPortName string, resInst tp_pb.ResourceInstance) error {
1135 logger.Debugw(ctx, "adding-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
1136 val, err := proto.Marshal(&resInst)
1137 if err != nil {
1138 logger.Errorw(ctx, "failed-to-marshall-resource-instance", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
1139 return err
1140 }
1141 err = t.config.ResourceInstanceKVBacked.Put(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName), val)
1142 return err
1143}
1144
1145func (t *TechProfileMgr) removeResourceInstanceFromKVStore(ctx context.Context, tpID uint32, uniPortName string) error {
1146 logger.Debugw(ctx, "removing-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName})
1147 if err := t.config.ResourceInstanceKVBacked.Delete(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName)); err != nil {
1148 logger.Errorw(ctx, "error-removing-resource-instance-to-kv-store", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
1149 return err
1150 }
1151 return nil
1152}
1153
1154func (t *TechProfileMgr) getTPFromKVStore(ctx context.Context, tpID uint32) *tp_pb.TechProfile {
1155 var tp *tp_pb.TechProfile
1156 t.tpMapLock.RLock()
1157 tp, ok := t.tpMap[tpID]
1158 t.tpMapLock.RUnlock()
1159 if ok {
1160 logger.Debugw(ctx, "found-tp-in-cache", log.Fields{"tpID": tpID})
1161 return tp
1162 }
1163 key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), tpID)
1164 logger.Debugw(ctx, "getting-tp-from-kv-store", log.Fields{"tpID": tpID, "Key": key})
1165 kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
1166 if err != nil {
1167 logger.Errorw(ctx, "error-fetching-from-kv-store", log.Fields{"err": err, "key": key})
1168 return nil
1169 }
1170 if kvresult != nil {
1171 /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
1172 if value, err := kvstore.ToByte(kvresult.Value); err == nil {
1173 lTp := &tp_pb.TechProfile{}
1174 reader := bytes.NewReader(value)
1175 if err = jsonpb.Unmarshal(reader, lTp); err != nil {
1176 logger.Errorw(ctx, "error-unmarshalling-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
1177 return nil
1178 }
1179
1180 logger.Debugw(ctx, "success-fetched-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lTp})
1181 return lTp
1182 } else {
1183 logger.Errorw(ctx, "error-decoding-tp", log.Fields{"err": err, "tpID": tpID})
1184 // We we create a default profile in this case.
1185 }
1186 }
1187
1188 return nil
1189}
1190
1191func (t *TechProfileMgr) getEponTPFromKVStore(ctx context.Context, tpID uint32) *tp_pb.EponTechProfile {
1192 var eponTp *tp_pb.EponTechProfile
1193 t.eponTpMapLock.RLock()
1194 eponTp, ok := t.eponTpMap[tpID]
1195 t.eponTpMapLock.RUnlock()
1196 if ok {
1197 logger.Debugw(ctx, "found-tp-in-cache", log.Fields{"tpID": tpID})
1198 return eponTp
1199 }
1200 key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), tpID)
1201 logger.Debugw(ctx, "getting-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "Key": key})
1202 kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
1203 if err != nil {
1204 logger.Errorw(ctx, "error-fetching-from-kv-store", log.Fields{"err": err, "key": key})
1205 return nil
1206 }
1207 if kvresult != nil {
1208 /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
1209 if value, err := kvstore.ToByte(kvresult.Value); err == nil {
1210 lEponTp := &tp_pb.EponTechProfile{}
1211 reader := bytes.NewReader(value)
1212 if err = jsonpb.Unmarshal(reader, lEponTp); err != nil {
1213 logger.Errorw(ctx, "error-unmarshalling-epon-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
1214 return nil
1215 }
1216
1217 logger.Debugw(ctx, "success-fetching-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lEponTp})
1218 return lEponTp
1219 }
1220 }
1221 return nil
1222}
1223
1224func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
1225
1226 logger.Infow(ctx, "kv-store", log.Fields{"storeType": storeType, "address": address})
1227 switch storeType {
1228 case "etcd":
1229 return kvstore.NewEtcdClient(ctx, address, timeout, log.WarnLevel)
1230 }
1231 return nil, errors.New("unsupported-kv-store")
1232}
1233
1234// buildTpInstanceFromResourceInstance for GPON, XGPON and XGS-PON technology - build TpInstance from TechProfile template and ResourceInstance
1235func (t *TechProfileMgr) buildTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.TechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
1236
1237 var usGemPortAttributeList []*tp_pb.GemPortAttributes
1238 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
1239 var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
1240 var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
1241
1242 if len(resInst.GemportIds) != int(tp.NumGemPorts) {
1243 logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-template-and-resource-instance",
1244 log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
1245 return nil
1246 }
1247 for index := 0; index < int(tp.NumGemPorts); index++ {
1248 usGemPortAttributeList = append(usGemPortAttributeList,
1249 &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
1250 MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
1251 PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
1252 AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
1253 SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
1254 PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
1255 Weight: tp.UpstreamGemPortAttributeList[index].Weight,
1256 DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
1257 DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
1258 }
1259
1260 //put multicast and unicast downstream GEM port attributes in different lists first
1261 for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
1262 if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
1263 dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
1264 &tp_pb.GemPortAttributes{
1265 MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
1266 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
1267 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
1268 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
1269 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
1270 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
1271 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
1272 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
1273 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
1274 IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
1275 DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
1276 StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
1277 } else {
1278 dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
1279 &tp_pb.GemPortAttributes{
1280 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
1281 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
1282 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
1283 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
1284 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
1285 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
1286 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
1287 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
1288 }
1289 }
1290 //add unicast downstream GEM ports to dsGemPortAttributeList
1291 if dsUnicastGemAttributeList != nil {
1292 for index := 0; index < int(tp.NumGemPorts); index++ {
1293 dsGemPortAttributeList = append(dsGemPortAttributeList,
1294 &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
1295 MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
1296 PbitMap: dsUnicastGemAttributeList[index].PbitMap,
1297 AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
1298 SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
1299 PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
1300 Weight: dsUnicastGemAttributeList[index].Weight,
1301 DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
1302 DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
1303 }
1304 }
1305 //add multicast GEM ports to dsGemPortAttributeList afterwards
1306 for k := range dsMulticastGemAttributeList {
1307 dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
1308 }
1309
1310 return &tp_pb.TechProfileInstance{
1311 SubscriberIdentifier: resInst.SubscriberIdentifier,
1312 Name: tp.Name,
1313 ProfileType: tp.ProfileType,
1314 Version: tp.Version,
1315 NumGemPorts: tp.NumGemPorts,
1316 InstanceControl: tp.InstanceControl,
1317 UsScheduler: &tp_pb.SchedulerAttributes{
1318 AllocId: resInst.AllocId,
1319 Direction: tp.UsScheduler.Direction,
1320 AdditionalBw: tp.UsScheduler.AdditionalBw,
1321 Priority: tp.UsScheduler.Priority,
1322 Weight: tp.UsScheduler.Weight,
1323 QSchedPolicy: tp.UsScheduler.QSchedPolicy},
1324 DsScheduler: &tp_pb.SchedulerAttributes{
1325 AllocId: resInst.AllocId,
1326 Direction: tp.DsScheduler.Direction,
1327 AdditionalBw: tp.DsScheduler.AdditionalBw,
1328 Priority: tp.DsScheduler.Priority,
1329 Weight: tp.DsScheduler.Weight,
1330 QSchedPolicy: tp.DsScheduler.QSchedPolicy},
1331 UpstreamGemPortAttributeList: usGemPortAttributeList,
1332 DownstreamGemPortAttributeList: dsGemPortAttributeList}
1333}
1334
1335// buildEponTpInstanceFromResourceInstance for EPON technology - build EponTpInstance from EponTechProfile template and ResourceInstance
1336func (t *TechProfileMgr) buildEponTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.EponTechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.EponTechProfileInstance {
1337
1338 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
1339 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
1340
1341 if len(resInst.GemportIds) != int(tp.NumGemPorts) {
1342 logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-epon-tp-template-and-resource-instance",
1343 log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
1344 return nil
1345 }
1346
1347 for index := 0; index < int(tp.NumGemPorts); index++ {
1348 usQueueAttributeList = append(usQueueAttributeList,
1349 &tp_pb.EPONQueueAttributes{GemportId: resInst.GemportIds[index],
1350 MaxQSize: tp.UpstreamQueueAttributeList[index].MaxQSize,
1351 PbitMap: tp.UpstreamQueueAttributeList[index].PbitMap,
1352 AesEncryption: tp.UpstreamQueueAttributeList[index].AesEncryption,
1353 TrafficType: tp.UpstreamQueueAttributeList[index].TrafficType,
1354 UnsolicitedGrantSize: tp.UpstreamQueueAttributeList[index].UnsolicitedGrantSize,
1355 NominalInterval: tp.UpstreamQueueAttributeList[index].NominalInterval,
1356 ToleratedPollJitter: tp.UpstreamQueueAttributeList[index].ToleratedPollJitter,
1357 RequestTransmissionPolicy: tp.UpstreamQueueAttributeList[index].RequestTransmissionPolicy,
1358 NumQSets: tp.UpstreamQueueAttributeList[index].NumQSets,
1359 QThresholds: tp.UpstreamQueueAttributeList[index].QThresholds,
1360 SchedulingPolicy: tp.UpstreamQueueAttributeList[index].SchedulingPolicy,
1361 PriorityQ: tp.UpstreamQueueAttributeList[index].PriorityQ,
1362 Weight: tp.UpstreamQueueAttributeList[index].Weight,
1363 DiscardPolicy: tp.UpstreamQueueAttributeList[index].DiscardPolicy,
1364 DiscardConfig: tp.UpstreamQueueAttributeList[index].DiscardConfig})
1365 }
1366
1367 for index := 0; index < int(tp.NumGemPorts); index++ {
1368 dsQueueAttributeList = append(dsQueueAttributeList,
1369 &tp_pb.EPONQueueAttributes{GemportId: resInst.GemportIds[index],
1370 MaxQSize: tp.DownstreamQueueAttributeList[index].MaxQSize,
1371 PbitMap: tp.DownstreamQueueAttributeList[index].PbitMap,
1372 AesEncryption: tp.DownstreamQueueAttributeList[index].AesEncryption,
1373 SchedulingPolicy: tp.DownstreamQueueAttributeList[index].SchedulingPolicy,
1374 PriorityQ: tp.DownstreamQueueAttributeList[index].PriorityQ,
1375 Weight: tp.DownstreamQueueAttributeList[index].Weight,
1376 DiscardPolicy: tp.DownstreamQueueAttributeList[index].DiscardPolicy,
1377 DiscardConfig: tp.DownstreamQueueAttributeList[index].DiscardConfig})
1378 }
1379
1380 return &tp_pb.EponTechProfileInstance{
1381 SubscriberIdentifier: resInst.SubscriberIdentifier,
1382 Name: tp.Name,
1383 ProfileType: tp.ProfileType,
1384 Version: tp.Version,
1385 NumGemPorts: tp.NumGemPorts,
1386 InstanceControl: tp.InstanceControl,
1387 PackageType: tp.PackageType,
1388 AllocId: resInst.AllocId,
1389 UpstreamQueueAttributeList: usQueueAttributeList,
1390 DownstreamQueueAttributeList: dsQueueAttributeList}
1391}
1392
1393func (t *TechProfileMgr) getTpInstanceFromResourceInstance(ctx context.Context, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
1394 if resInst == nil {
1395 logger.Error(ctx, "resource-instance-nil")
1396 return nil
1397 }
1398 tp := t.getTPFromKVStore(ctx, resInst.TpId)
1399 if tp == nil {
1400 logger.Warnw(ctx, "tp-not-found-on-kv--creating-default-tp", log.Fields{"tpID": resInst.TpId})
1401 tp = t.getDefaultTechProfile(ctx)
1402 }
1403 return t.buildTpInstanceFromResourceInstance(ctx, tp, resInst)
1404}
1405
1406func (t *TechProfileMgr) getEponTpInstanceFromResourceInstance(ctx context.Context, resInst *tp_pb.ResourceInstance) *tp_pb.EponTechProfileInstance {
1407 if resInst == nil {
1408 logger.Error(ctx, "resource-instance-nil")
1409 return nil
1410 }
1411 eponTp := t.getEponTPFromKVStore(ctx, resInst.TpId)
1412 if eponTp == nil {
1413 logger.Warnw(ctx, "tp-not-found-on-kv--creating-default-tp", log.Fields{"tpID": resInst.TpId})
1414 eponTp = t.getDefaultEponProfile(ctx)
1415 }
1416 return t.buildEponTpInstanceFromResourceInstance(ctx, eponTp, resInst)
1417}
1418
1419func (t *TechProfileMgr) reconcileTpInstancesToCache(ctx context.Context) error {
1420
1421 tech := t.resourceMgr.GetTechnology()
1422 newCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
1423 defer cancel()
1424 kvPairs, _ := t.config.ResourceInstanceKVBacked.List(newCtx, tech)
1425
1426 if tech == xgspon || tech == xgpon || tech == gpon {
1427 for keyPath, kvPair := range kvPairs {
1428 logger.Debugw(ctx, "attempting-to-reconcile-tp-instance-from-resource-instance", log.Fields{"resourceInstPath": keyPath})
1429 if value, err := kvstore.ToByte(kvPair.Value); err == nil {
1430 var resInst tp_pb.ResourceInstance
1431 if err = proto.Unmarshal(value, &resInst); err != nil {
1432 logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"err": err, "keyPath": keyPath, "value": value})
1433 continue
1434 } else {
1435 if tpInst := t.getTpInstanceFromResourceInstance(ctx, &resInst); tpInst != nil {
1436 // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
1437 keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
1438 if len(keySuffixSlice) == 2 {
1439 keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
1440 // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
1441 if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
1442 logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
1443 continue
1444 }
1445 } else {
1446 logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
1447 continue
1448 }
1449 t.tpInstanceMapLock.Lock()
1450 t.tpInstanceMap[keySuffixSlice[1]] = tpInst
1451 t.tpInstanceMapLock.Unlock()
1452 logger.Debugw(ctx, "reconciled-tp-success", log.Fields{"keyPath": keyPath})
1453 }
1454 }
1455 } else {
1456 logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
1457 }
1458 }
1459 } else if tech == epon {
1460 for keyPath, kvPair := range kvPairs {
1461 logger.Debugw(ctx, "attempting-to-reconcile-epon-tp-instance", log.Fields{"keyPath": keyPath})
1462 if value, err := kvstore.ToByte(kvPair.Value); err == nil {
1463 var resInst tp_pb.ResourceInstance
1464 if err = proto.Unmarshal(value, &resInst); err != nil {
1465 logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"keyPath": keyPath, "value": value})
1466 continue
1467 } else {
1468 if eponTpInst := t.getEponTpInstanceFromResourceInstance(ctx, &resInst); eponTpInst != nil {
1469 // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
1470 keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
1471 if len(keySuffixSlice) == 2 {
1472 keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
1473 // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
1474 if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
1475 logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
1476 continue
1477 }
1478 } else {
1479 logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
1480 continue
1481 }
1482 t.epontpInstanceMapLock.Lock()
1483 t.eponTpInstanceMap[keySuffixSlice[1]] = eponTpInst
1484 t.epontpInstanceMapLock.Unlock()
1485 logger.Debugw(ctx, "reconciled-epon-tp-success", log.Fields{"keyPath": keyPath})
1486 }
1487 }
1488 } else {
1489 logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
1490 }
1491 }
1492 } else {
1493 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
1494 return fmt.Errorf("unknown-tech-%v", tech)
1495 }
1496
1497 return nil
1498}