blob: 50b368861236726e439963d7920ff179a3fce2ac [file] [log] [blame]
Girish Gowdra8a0bdcd2021-05-13 12:31:04 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package techprofile
18
19import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "github.com/gogo/protobuf/proto"
25 "github.com/golang/protobuf/jsonpb"
26 "github.com/opencord/voltha-protos/v4/go/openolt"
27 "regexp"
28 "strconv"
29 "sync"
30 "time"
31
Girish Gowdra4c3d4602021-07-22 16:33:37 -070032 "github.com/opencord/voltha-lib-go/v6/pkg/db"
Girish Gowdra8a0bdcd2021-05-13 12:31:04 -070033
Girish Gowdra4c3d4602021-07-22 16:33:37 -070034 "github.com/opencord/voltha-lib-go/v6/pkg/db/kvstore"
35 "github.com/opencord/voltha-lib-go/v6/pkg/log"
Girish Gowdra8a0bdcd2021-05-13 12:31:04 -070036 tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
37)
38
39// Interface to pon resource manager APIs
40type iPonResourceMgr interface {
41 GetResourceID(ctx context.Context, intfID uint32, resourceType string, numIDs uint32) ([]uint32, error)
42 FreeResourceID(ctx context.Context, intfID uint32, resourceType string, ReleaseContent []uint32) error
43 GetResourceTypeAllocID() string
44 GetResourceTypeGemPortID() string
45 GetResourceTypeOnuID() string
46 GetTechnology() string
47}
48
49type SchedulingPolicy int32
50
51const (
52 SchedulingPolicy_WRR SchedulingPolicy = 0
53 SchedulingPolicy_StrictPriority SchedulingPolicy = 1
54 SchedulingPolicy_Hybrid SchedulingPolicy = 2
55)
56
57type AdditionalBW int32
58
59const (
60 AdditionalBW_AdditionalBW_None AdditionalBW = 0
61 AdditionalBW_AdditionalBW_NA AdditionalBW = 1
62 AdditionalBW_AdditionalBW_BestEffort AdditionalBW = 2
63 AdditionalBW_AdditionalBW_Auto AdditionalBW = 3
64)
65
66type DiscardPolicy int32
67
68const (
69 DiscardPolicy_TailDrop DiscardPolicy = 0
70 DiscardPolicy_WTailDrop DiscardPolicy = 1
71 DiscardPolicy_Red DiscardPolicy = 2
72 DiscardPolicy_WRed DiscardPolicy = 3
73)
74
75// Required uniPortName format
76var uniPortNameFormatRegexp = regexp.MustCompile(`^olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
77
78// instance control defaults
79const (
80 defaultOnuInstance = "multi-instance"
81 defaultUniInstance = "single-instance"
82 defaultGemPayloadSize = "auto"
83)
84
85// default discard config constants
86const (
87 defaultMinThreshold = 0
88 defaultMaxThreshold = 0
89 defaultMaxProbability = 0
90)
91
92// default scheduler contants
93const (
94 defaultPriority = 0
95 defaultWeight = 0
96)
97
98// default GEM attribute constants
99const (
100 defaultAESEncryption = "True"
101 defaultPriorityQueue = 0
102 defaultQueueWeight = 0
103 defaultMaxQueueSize = "auto"
104 defaultIsMulticast = "False"
105 defaultAccessControlList = "224.0.0.0-239.255.255.255"
106 defaultMcastGemID = 4069
107)
108
109// Default EPON constants
110const (
111 defaultPakageType = "B"
112)
113const (
114 defaultTrafficType = "BE"
115 defaultUnsolicitedGrantSize = 0
116 defaultNominalInterval = 0
117 defaultToleratedPollJitter = 0
118 defaultRequestTransmissionPolicy = 0
119 defaultNumQueueSet = 2
120)
121const (
122 defaultQThreshold1 = 5500
123 defaultQThreshold2 = 0
124 defaultQThreshold3 = 0
125 defaultQThreshold4 = 0
126 defaultQThreshold5 = 0
127 defaultQThreshold6 = 0
128 defaultQThreshold7 = 0
129)
130
131const (
132 xgspon = "XGS-PON"
133 xgpon = "XGPON"
134 gpon = "GPON"
135 epon = "EPON"
136)
137
138const (
139 MaxUniPortPerOnu = 16 // TODO: Adapter uses its own constant for MaxUniPort. How to synchronize this and have a single source of truth?
140)
141
142type TechProfileMgr struct {
143 config *TechProfileFlags
144 resourceMgr iPonResourceMgr
145 OnuIDMgmtLock sync.RWMutex
146 GemPortIDMgmtLock sync.RWMutex
147 AllocIDMgmtLock sync.RWMutex
148 tpInstanceMap map[string]*tp_pb.TechProfileInstance // Map of tp path to tp instance
149 tpInstanceMapLock sync.RWMutex
150 eponTpInstanceMap map[string]*tp_pb.EponTechProfileInstance // Map of tp path to epon tp instance
151 epontpInstanceMapLock sync.RWMutex
152 tpMap map[uint32]*tp_pb.TechProfile // Map of tp id to tp
153 tpMapLock sync.RWMutex
154 eponTpMap map[uint32]*tp_pb.EponTechProfile // map of tp id to epon tp
155 eponTpMapLock sync.RWMutex
156}
157
158func (t *TechProfileMgr) SetKVClient(ctx context.Context, pathPrefix string) *db.Backend {
159 kvClient, err := newKVClient(ctx, t.config.KVStoreType, t.config.KVStoreAddress, t.config.KVStoreTimeout)
160 if err != nil {
161 logger.Errorw(ctx, "failed-to-create-kv-client",
162 log.Fields{
163 "type": t.config.KVStoreType, "address": t.config.KVStoreAddress,
164 "timeout": t.config.KVStoreTimeout, "prefix": pathPrefix,
165 "error": err.Error(),
166 })
167 return nil
168 }
169 return &db.Backend{
170 Client: kvClient,
171 StoreType: t.config.KVStoreType,
172 Address: t.config.KVStoreAddress,
173 Timeout: t.config.KVStoreTimeout,
174 PathPrefix: pathPrefix}
175
176 /* TODO : Make sure direct call to NewBackend is working fine with backend , currently there is some
177 issue between kv store and backend , core is not calling NewBackend directly
178 kv := model.NewBackend(t.config.kvStoreType, t.config.KVStoreHost, t.config.KVStorePort,
179 t.config.KVStoreTimeout, kvStoreTechProfilePathPrefix)
180 */
181}
182
183func NewTechProfile(ctx context.Context, resourceMgr iPonResourceMgr, kvStoreType string, kvStoreAddress string, basePathKvStore string) (*TechProfileMgr, error) {
184 var techprofileObj TechProfileMgr
185 logger.Debug(ctx, "initializing-techprofile-mananger")
186 techprofileObj.config = NewTechProfileFlags(kvStoreType, kvStoreAddress, basePathKvStore)
187 techprofileObj.config.KVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.TPKVPathPrefix)
188 techprofileObj.config.DefaultTpKVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.defaultTpKvPathPrefix)
189 if techprofileObj.config.KVBackend == nil {
190 logger.Error(ctx, "failed-to-initialize-backend")
191 return nil, errors.New("kv-backend-init-failed")
192 }
193 techprofileObj.config.ResourceInstanceKVBacked = techprofileObj.SetKVClient(ctx, techprofileObj.config.ResourceInstanceKVPathPrefix)
194 if techprofileObj.config.ResourceInstanceKVBacked == nil {
195 logger.Error(ctx, "failed-to-initialize-resource-instance-kv-backend")
196 return nil, errors.New("resource-instance-kv-backend-init-failed")
197 }
198 techprofileObj.resourceMgr = resourceMgr
199 techprofileObj.tpInstanceMap = make(map[string]*tp_pb.TechProfileInstance)
200 techprofileObj.eponTpInstanceMap = make(map[string]*tp_pb.EponTechProfileInstance)
201 techprofileObj.tpMap = make(map[uint32]*tp_pb.TechProfile)
202 techprofileObj.eponTpMap = make(map[uint32]*tp_pb.EponTechProfile)
203 logger.Debug(ctx, "reconcile-tp-instance-cache-start")
204 if err := techprofileObj.reconcileTpInstancesToCache(ctx); err != nil {
205 logger.Errorw(ctx, "failed-to-reconcile-tp-instances", log.Fields{"err": err})
206 return nil, err
207 }
208 logger.Debug(ctx, "reconcile-tp-instance-cache-end")
209 logger.Debug(ctx, "initializing-tech-profile-manager-object-success")
210 return &techprofileObj, nil
211}
212
213// GetTechProfileInstanceKey returns the tp instance key that is used to reference TP Instance Map
214func (t *TechProfileMgr) GetTechProfileInstanceKey(ctx context.Context, tpID uint32, uniPortName string) string {
215 logger.Debugw(ctx, "get-tp-instance-kv-key", log.Fields{
216 "uniPortName": uniPortName,
217 "tpId": tpID,
218 })
219 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
220 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
221 logger.Warnw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
222 }
223 // The key path prefix (like service/voltha/technology_profiles or service/voltha_voltha/technology_profiles)
224 // is expected to be attached by the components that use this path as part of the KVBackend configuration.
225 resourceInstanceKvPathSuffix := "%s/%d/%s" // <technology>/<tpID>/<uni-port-name>
226 // <uni-port-name> must be of the format pon-{\d+}/onu-{\d+}/uni-{\d+}
227 return fmt.Sprintf(resourceInstanceKvPathSuffix, t.resourceMgr.GetTechnology(), tpID, uniPortName)
228}
229
230// GetTPInstance gets TP instance from cache if found
231func (t *TechProfileMgr) GetTPInstance(ctx context.Context, path string) (interface{}, error) {
232 tech := t.resourceMgr.GetTechnology()
233 switch tech {
234 case xgspon, xgpon, gpon:
235 t.tpInstanceMapLock.RLock()
236 defer t.tpInstanceMapLock.RUnlock()
237 tpInst, ok := t.tpInstanceMap[path]
238 if !ok {
239 return nil, fmt.Errorf("tp-instance-not-found-tp-path-%v", path)
240 }
241 return tpInst, nil
242 case epon:
243 t.epontpInstanceMapLock.RLock()
244 defer t.epontpInstanceMapLock.RUnlock()
245 tpInst, ok := t.eponTpInstanceMap[path]
246 if !ok {
247 return nil, fmt.Errorf("tp-instance-not-found-tp-path-%v", path)
248 }
249 return tpInst, nil
250 default:
251 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
252 return nil, fmt.Errorf("unknown-tech-%s-tp-path-%v", tech, path)
253 }
254}
255
256// CreateTechProfileInstance creates a new TP instance.
257func (t *TechProfileMgr) CreateTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string, intfID uint32) (interface{}, error) {
258 var tpInstance *tp_pb.TechProfileInstance
259 var eponTpInstance *tp_pb.EponTechProfileInstance
260
261 logger.Infow(ctx, "creating-tp-instance", log.Fields{"tpID": tpID, "uni": uniPortName, "intId": intfID})
262
263 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
264 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
265 logger.Errorw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
266 return nil, fmt.Errorf("uni-port-name-not-confirming-to-format-%s", uniPortName)
267 }
268 tpInstancePathSuffix := t.GetTechProfileInstanceKey(ctx, tpID, uniPortName)
269
270 if t.resourceMgr.GetTechnology() == epon {
271 tp := t.getEponTPFromKVStore(ctx, tpID)
272 if tp != nil {
273 if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
274 logger.Error(ctx, "invalid-instance-ctrl-attr-using-default-tp")
275 tp = t.getDefaultEponProfile(ctx)
276 } else {
277 logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpID": tpID})
278 }
279 } else {
280 logger.Info(ctx, "tp-not-found-on-kv--creating-default-tp")
281 tp = t.getDefaultEponProfile(ctx)
282 }
283 // Store TP in cache
284 t.eponTpMapLock.Lock()
285 t.eponTpMap[tpID] = tp
286 t.eponTpMapLock.Unlock()
287
288 if eponTpInstance = t.allocateEponTPInstance(ctx, uniPortName, tp, intfID, tpInstancePathSuffix); eponTpInstance == nil {
289 logger.Error(ctx, "tp-instance-allocation-failed")
290 return nil, errors.New("tp-instance-allocation-failed")
291 }
292 t.epontpInstanceMapLock.Lock()
293 t.eponTpInstanceMap[tpInstancePathSuffix] = eponTpInstance
294 t.epontpInstanceMapLock.Unlock()
295 resInst := tp_pb.ResourceInstance{
296 TpId: tpID,
297 ProfileType: eponTpInstance.ProfileType,
298 SubscriberIdentifier: eponTpInstance.SubscriberIdentifier,
299 AllocId: eponTpInstance.AllocId,
300 }
301 for _, usQAttr := range eponTpInstance.UpstreamQueueAttributeList {
302 resInst.GemportIds = append(resInst.GemportIds, usQAttr.GemportId)
303 }
304
305 logger.Infow(ctx, "epon-tp-instance-created-successfully",
306 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
307 if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
308 logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
309 allocIDs := make([]uint32, 0)
310 allocIDs = append(allocIDs, resInst.AllocId)
311 errList := make([]error, 0)
312 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), allocIDs))
313 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), resInst.GemportIds))
314 if len(errList) > 0 {
315 logger.Errorw(ctx, "failed-to-free-up-resources-on-kv-store--system-behavior-has-become-erratic", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "errList": errList})
316 }
317 return nil, err
318 }
319 return eponTpInstance, nil
320 } else {
321 tp := t.getTPFromKVStore(ctx, tpID)
322 if tp != nil {
323 if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
324 logger.Error(ctx, "invalid-instance-ctrl-attr--using-default-tp")
325 tp = t.getDefaultTechProfile(ctx)
326 } else {
327 logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpID": tpID})
328 }
329 } else {
330 logger.Info(ctx, "tp-not-found-on-kv--creating-default-tp")
331 tp = t.getDefaultTechProfile(ctx)
332 }
333 // Store TP in cache
334 t.tpMapLock.Lock()
335 t.tpMap[tpID] = tp
336 t.tpMapLock.Unlock()
337
338 if tpInstance = t.allocateTPInstance(ctx, uniPortName, tp, intfID, tpInstancePathSuffix); tpInstance == nil {
339 logger.Error(ctx, "tp-instance-allocation-failed")
340 return nil, errors.New("tp-instance-allocation-failed")
341 }
342 t.tpInstanceMapLock.Lock()
343 t.tpInstanceMap[tpInstancePathSuffix] = tpInstance
344 t.tpInstanceMapLock.Unlock()
345
346 resInst := tp_pb.ResourceInstance{
347 TpId: tpID,
348 ProfileType: tpInstance.ProfileType,
349 SubscriberIdentifier: tpInstance.SubscriberIdentifier,
350 AllocId: tpInstance.UsScheduler.AllocId,
351 }
352 for _, usQAttr := range tpInstance.UpstreamGemPortAttributeList {
353 resInst.GemportIds = append(resInst.GemportIds, usQAttr.GemportId)
354 }
355
356 logger.Infow(ctx, "tp-instance-created-successfully",
357 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
358 if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
359 logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
360 allocIDs := make([]uint32, 0)
361 allocIDs = append(allocIDs, resInst.AllocId)
362 errList := make([]error, 0)
363 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), allocIDs))
364 errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), resInst.GemportIds))
365 if len(errList) > 0 {
366 logger.Fatalw(ctx, "failed-to-free-up-resources-on-kv-store--system-behavior-has-become-erratic", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
367 }
368 return nil, err
369 }
370
371 logger.Infow(ctx, "resource-instance-added-to-kv-store-successfully",
372 log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
373 return tpInstance, nil
374 }
375}
376
377// DeleteTechProfileInstance deletes the TP instance from the local cache as well as deletes the corresponding
378// resource instance from the KV store.
379func (t *TechProfileMgr) DeleteTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string) error {
380 // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
381 if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
382 logger.Errorw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
383 return fmt.Errorf("uni-port-name-not-confirming-to-format--%s", uniPortName)
384 }
385 path := t.GetTechProfileInstanceKey(ctx, tpID, uniPortName)
386 logger.Infow(ctx, "delete-tp-instance-from-cache", log.Fields{"key": path})
387 t.tpInstanceMapLock.Lock()
388 delete(t.tpInstanceMap, path)
389 t.tpInstanceMapLock.Unlock()
390 if err := t.removeResourceInstanceFromKVStore(ctx, tpID, uniPortName); err != nil {
391 return err
392 }
393 return nil
394}
395
396func (t *TechProfileMgr) GetMulticastTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance) []*tp_pb.TrafficQueue {
397 var encryp bool
398 NumGemPorts := len(tp.DownstreamGemPortAttributeList)
399 mcastTrafficQueues := make([]*tp_pb.TrafficQueue, 0)
400 for Count := 0; Count < NumGemPorts; Count++ {
401 if !isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
402 continue
403 }
404 if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
405 encryp = true
406 } else {
407 encryp = false
408 }
409 mcastTrafficQueues = append(mcastTrafficQueues, &tp_pb.TrafficQueue{
410 Direction: tp_pb.Direction_DOWNSTREAM,
411 GemportId: tp.DownstreamGemPortAttributeList[Count].MulticastGemId,
412 PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
413 AesEncryption: encryp,
414 SchedPolicy: tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy,
415 Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQ,
416 Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
417 DiscardPolicy: tp.DownstreamGemPortAttributeList[Count].DiscardPolicy,
418 })
419 }
420 logger.Debugw(ctx, "Downstream Multicast Traffic queue list ", log.Fields{"queuelist": mcastTrafficQueues})
421 return mcastTrafficQueues
422}
423
424func (t *TechProfileMgr) GetGemportForPbit(ctx context.Context, tp interface{}, dir tp_pb.Direction, pbit uint32) interface{} {
425 /*
426 Function to get the Gemport mapped to a pbit.
427 */
428 switch tp := tp.(type) {
429 case *tp_pb.TechProfileInstance:
430 if dir == tp_pb.Direction_UPSTREAM {
431 // upstream GEM ports
432 numGemPorts := len(tp.UpstreamGemPortAttributeList)
433 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
434 lenOfPbitMap := len(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap)
435 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
436 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
437 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
438 if p, err := strconv.Atoi(string(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
439 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
440 logger.Debugw(ctx, "Found-US-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.UpstreamGemPortAttributeList[gemCnt].GemportId})
441 return tp.UpstreamGemPortAttributeList[gemCnt]
442 }
443 }
444 }
445 }
446 } else if dir == tp_pb.Direction_DOWNSTREAM {
447 //downstream GEM ports
448 numGemPorts := len(tp.DownstreamGemPortAttributeList)
449 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
450 lenOfPbitMap := len(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap)
451 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
452 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
453 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
454 if p, err := strconv.Atoi(string(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
455 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
456 logger.Debugw(ctx, "Found-DS-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.DownstreamGemPortAttributeList[gemCnt].GemportId})
457 return tp.DownstreamGemPortAttributeList[gemCnt]
458 }
459 }
460 }
461 }
462 }
463 logger.Errorw(ctx, "No-GemportId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
464 case *openolt.EponTechProfileInstance:
465 if dir == tp_pb.Direction_UPSTREAM {
466 // upstream GEM ports
467 numGemPorts := len(tp.UpstreamQueueAttributeList)
468 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
469 lenOfPbitMap := len(tp.UpstreamQueueAttributeList[gemCnt].PbitMap)
470 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
471 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
472 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
473 if p, err := strconv.Atoi(string(tp.UpstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
474 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
475 logger.Debugw(ctx, "Found-US-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.UpstreamQueueAttributeList[gemCnt].GemportId})
476 return tp.UpstreamQueueAttributeList[gemCnt]
477 }
478 }
479 }
480 }
481 } else if dir == tp_pb.Direction_DOWNSTREAM {
482 //downstream GEM ports
483 numGemPorts := len(tp.DownstreamQueueAttributeList)
484 for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
485 lenOfPbitMap := len(tp.DownstreamQueueAttributeList[gemCnt].PbitMap)
486 for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
487 // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
488 // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
489 if p, err := strconv.Atoi(string(tp.DownstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
490 if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
491 logger.Debugw(ctx, "Found-DS-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.DownstreamQueueAttributeList[gemCnt].GemportId})
492 return tp.DownstreamQueueAttributeList[gemCnt]
493 }
494 }
495 }
496 }
497 }
498 logger.Errorw(ctx, "No-QueueId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
499 default:
500 logger.Errorw(ctx, "unknown-tech", log.Fields{"tp": tp})
501 }
502 return nil
503}
504
505// FindAllTpInstances returns all TechProfile instances for a given TechProfile table-id, pon interface ID and onu ID.
506func (t *TechProfileMgr) FindAllTpInstances(ctx context.Context, oltDeviceID string, tpID uint32, intfID uint32, onuID uint32) interface{} {
507 onuTpInstancePathSuffix := fmt.Sprintf("%s/%d/olt-{%s}/pon-{%d}/onu-{%d}", t.resourceMgr.GetTechnology(), tpID, oltDeviceID, intfID, onuID)
508 tech := t.resourceMgr.GetTechnology()
509 if tech == xgspon || tech == xgpon || tech == gpon {
510 t.tpInstanceMapLock.RLock()
511 defer t.tpInstanceMapLock.RUnlock()
512 tpInstancesTech := make([]tp_pb.TechProfileInstance, 0)
513 for i := 0; i < MaxUniPortPerOnu; i++ {
514 key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
515 if tpInst, ok := t.tpInstanceMap[key]; ok {
516 tpInstancesTech = append(tpInstancesTech, *tpInst)
517 }
518 }
519 return tpInstancesTech
520 } else if tech == epon {
521 t.epontpInstanceMapLock.RLock()
522 defer t.epontpInstanceMapLock.RUnlock()
523 tpInstancesTech := make([]tp_pb.EponTechProfileInstance, 0)
524 for i := 0; i < MaxUniPortPerOnu; i++ {
525 key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
526 if tpInst, ok := t.eponTpInstanceMap[key]; ok {
527 tpInstancesTech = append(tpInstancesTech, *tpInst)
528 }
529 }
530 return tpInstancesTech
531 } else {
532 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech, "tpID": tpID, "onuID": onuID, "intfID": intfID})
533 }
534 return nil
535}
536
537func (t *TechProfileMgr) GetResourceID(ctx context.Context, intfID uint32, resourceType string, numIDs uint32) ([]uint32, error) {
538 logger.Debugw(ctx, "getting-resource-id", log.Fields{
539 "intf-id": intfID,
540 "resource-type": resourceType,
541 "num": numIDs,
542 })
543 var err error
544 var ids []uint32
545 switch resourceType {
546 case t.resourceMgr.GetResourceTypeAllocID():
547 t.AllocIDMgmtLock.Lock()
548 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
549 t.AllocIDMgmtLock.Unlock()
550 case t.resourceMgr.GetResourceTypeGemPortID():
551 t.GemPortIDMgmtLock.Lock()
552 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
553 t.GemPortIDMgmtLock.Unlock()
554 case t.resourceMgr.GetResourceTypeOnuID():
555 t.OnuIDMgmtLock.Lock()
556 ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
557 t.OnuIDMgmtLock.Unlock()
558 default:
559 return nil, fmt.Errorf("resourceType %s not supported", resourceType)
560 }
561 if err != nil {
562 return nil, err
563 }
564 return ids, nil
565}
566
567func (t *TechProfileMgr) FreeResourceID(ctx context.Context, intfID uint32, resourceType string, ReleaseContent []uint32) error {
568 logger.Debugw(ctx, "freeing-resource-id", log.Fields{
569 "intf-id": intfID,
570 "resource-type": resourceType,
571 "release-content": ReleaseContent,
572 })
573 var err error
574 switch resourceType {
575 case t.resourceMgr.GetResourceTypeAllocID():
576 t.AllocIDMgmtLock.Lock()
577 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
578 t.AllocIDMgmtLock.Unlock()
579 case t.resourceMgr.GetResourceTypeGemPortID():
580 t.GemPortIDMgmtLock.Lock()
581 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
582 t.GemPortIDMgmtLock.Unlock()
583 case t.resourceMgr.GetResourceTypeOnuID():
584 t.OnuIDMgmtLock.Lock()
585 err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
586 t.OnuIDMgmtLock.Unlock()
587 default:
588 return fmt.Errorf("resourceType %s not supported", resourceType)
589 }
590 if err != nil {
591 return err
592 }
593 return nil
594}
595
596func (t *TechProfileMgr) GetUsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
597 return &tp_pb.SchedulerConfig{
598 Direction: tpInstance.UsScheduler.Direction,
599 AdditionalBw: tpInstance.UsScheduler.AdditionalBw,
600 Priority: tpInstance.UsScheduler.Priority,
601 Weight: tpInstance.UsScheduler.Weight,
602 SchedPolicy: tpInstance.UsScheduler.QSchedPolicy}
603}
604
605func (t *TechProfileMgr) GetDsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
606 return &tp_pb.SchedulerConfig{
607 Direction: tpInstance.DsScheduler.Direction,
608 AdditionalBw: tpInstance.DsScheduler.AdditionalBw,
609 Priority: tpInstance.DsScheduler.Priority,
610 Weight: tpInstance.DsScheduler.Weight,
611 SchedPolicy: tpInstance.DsScheduler.QSchedPolicy}
612}
613
614func (t *TechProfileMgr) GetTrafficScheduler(tpInstance *tp_pb.TechProfileInstance, SchedCfg *tp_pb.SchedulerConfig,
615 ShapingCfg *tp_pb.TrafficShapingInfo) *tp_pb.TrafficScheduler {
616
617 tSched := &tp_pb.TrafficScheduler{
618 Direction: SchedCfg.Direction,
619 AllocId: tpInstance.UsScheduler.AllocId,
620 TrafficShapingInfo: ShapingCfg,
621 Scheduler: SchedCfg}
622
623 return tSched
624}
625
626func (t *TechProfileMgr) GetTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance, direction tp_pb.Direction) ([]*tp_pb.TrafficQueue, error) {
627
628 var encryp bool
629 if direction == tp_pb.Direction_UPSTREAM {
630 // upstream GEM ports
631 NumGemPorts := len(tp.UpstreamGemPortAttributeList)
632 GemPorts := make([]*tp_pb.TrafficQueue, 0)
633 for Count := 0; Count < NumGemPorts; Count++ {
634 if tp.UpstreamGemPortAttributeList[Count].AesEncryption == "True" {
635 encryp = true
636 } else {
637 encryp = false
638 }
639
640 GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
641 Direction: direction,
642 GemportId: tp.UpstreamGemPortAttributeList[Count].GemportId,
643 PbitMap: tp.UpstreamGemPortAttributeList[Count].PbitMap,
644 AesEncryption: encryp,
645 SchedPolicy: tp.UpstreamGemPortAttributeList[Count].SchedulingPolicy,
646 Priority: tp.UpstreamGemPortAttributeList[Count].PriorityQ,
647 Weight: tp.UpstreamGemPortAttributeList[Count].Weight,
648 DiscardPolicy: tp.UpstreamGemPortAttributeList[Count].DiscardPolicy,
649 })
650 }
651 logger.Debugw(ctx, "Upstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
652 return GemPorts, nil
653 } else if direction == tp_pb.Direction_DOWNSTREAM {
654 //downstream GEM ports
655 NumGemPorts := len(tp.DownstreamGemPortAttributeList)
656 GemPorts := make([]*tp_pb.TrafficQueue, 0)
657 for Count := 0; Count < NumGemPorts; Count++ {
658 if isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
659 //do not take multicast GEM ports. They are handled separately.
660 continue
661 }
662 if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
663 encryp = true
664 } else {
665 encryp = false
666 }
667
668 GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
669 Direction: direction,
670 GemportId: tp.DownstreamGemPortAttributeList[Count].GemportId,
671 PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
672 AesEncryption: encryp,
673 SchedPolicy: tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy,
674 Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQ,
675 Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
676 DiscardPolicy: tp.DownstreamGemPortAttributeList[Count].DiscardPolicy,
677 })
678 }
679 logger.Debugw(ctx, "Downstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
680 return GemPorts, nil
681 }
682
683 logger.Errorf(ctx, "Unsupported direction %s used for generating Traffic Queue list", direction)
684 return nil, fmt.Errorf("downstream gem port traffic queue creation failed due to unsupported direction %s", direction)
685}
686
687func (t *TechProfileMgr) validateInstanceControlAttr(ctx context.Context, instCtl tp_pb.InstanceControl) error {
688 if instCtl.Onu != "single-instance" && instCtl.Onu != "multi-instance" {
689 logger.Errorw(ctx, "invalid-onu-instance-control-attribute", log.Fields{"onu-inst": instCtl.Onu})
690 return errors.New("invalid-onu-instance-ctl-attr")
691 }
692
693 if instCtl.Uni != "single-instance" && instCtl.Uni != "multi-instance" {
694 logger.Errorw(ctx, "invalid-uni-instance-control-attribute", log.Fields{"uni-inst": instCtl.Uni})
695 return errors.New("invalid-uni-instance-ctl-attr")
696 }
697
698 if instCtl.Uni == "multi-instance" {
699 logger.Error(ctx, "uni-multi-instance-tp-not-supported")
700 return errors.New("uni-multi-instance-tp-not-supported")
701 }
702
703 return nil
704}
705
706// allocateTPInstance for GPON, XGPON and XGS-PON technology
707func (t *TechProfileMgr) allocateTPInstance(ctx context.Context, uniPortName string, tp *tp_pb.TechProfile, intfID uint32, tpInstPathSuffix string) *tp_pb.TechProfileInstance {
708
709 var usGemPortAttributeList []*tp_pb.GemPortAttributes
710 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
711 var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
712 var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
713 var tcontIDs []uint32
714 var gemPorts []uint32
715 var err error
716
717 logger.Infow(ctx, "Allocating TechProfileMgr instance from techprofile template", log.Fields{"uniPortName": uniPortName, "intfID": intfID, "numGem": tp.NumGemPorts})
718
719 if tp.InstanceControl.Onu == "multi-instance" {
720 tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1)
721 if err != nil {
722 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
723 return nil
724 }
725 } else { // "single-instance"
726 if tpInst := t.getSingleInstanceTp(ctx, tpInstPathSuffix); tpInst == nil {
727 // No "single-instance" tp found on one any uni port for the given TP ID
728 // Allocate a new TcontID or AllocID
729 tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1)
730 if err != nil {
731 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
732 return nil
733 }
734 } else {
735 // Use the alloc-id from the existing TpInstance
736 tcontIDs = append(tcontIDs, tpInst.UsScheduler.AllocId)
737 }
738 }
739 logger.Debugw(ctx, "Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
740 gemPorts, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts)
741 if err != nil {
742 logger.Errorw(ctx, "Error getting gemport ids from rsrcrMgr", log.Fields{"err": err, "intfID": intfID, "numGemports": tp.NumGemPorts})
743 return nil
744 }
745 logger.Infow(ctx, "Allocated tconts and GEM ports successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
746 for index := 0; index < int(tp.NumGemPorts); index++ {
747 usGemPortAttributeList = append(usGemPortAttributeList,
748 &tp_pb.GemPortAttributes{GemportId: gemPorts[index],
749 MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
750 PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
751 AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
752 SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
753 PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
754 Weight: tp.UpstreamGemPortAttributeList[index].Weight,
755 DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
756 DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
757 }
758
759 logger.Info(ctx, "length of DownstreamGemPortAttributeList", len(tp.DownstreamGemPortAttributeList))
760 //put multicast and unicast downstream GEM port attributes in different lists first
761 for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
762 if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
763 dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
764 &tp_pb.GemPortAttributes{
765 MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
766 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
767 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
768 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
769 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
770 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
771 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
772 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
773 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
774 IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
775 DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
776 StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
777 } else {
778 dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
779 &tp_pb.GemPortAttributes{
780 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
781 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
782 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
783 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
784 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
785 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
786 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
787 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
788 }
789 }
790 //add unicast downstream GEM ports to dsGemPortAttributeList
791 if dsUnicastGemAttributeList != nil {
792 for index := 0; index < int(tp.NumGemPorts); index++ {
793 dsGemPortAttributeList = append(dsGemPortAttributeList,
794 &tp_pb.GemPortAttributes{GemportId: gemPorts[index],
795 MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
796 PbitMap: dsUnicastGemAttributeList[index].PbitMap,
797 AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
798 SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
799 PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
800 Weight: dsUnicastGemAttributeList[index].Weight,
801 DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
802 DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
803 }
804 }
805 //add multicast GEM ports to dsGemPortAttributeList afterwards
806 for k := range dsMulticastGemAttributeList {
807 dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
808 }
809
810 return &tp_pb.TechProfileInstance{
811 SubscriberIdentifier: uniPortName,
812 Name: tp.Name,
813 ProfileType: tp.ProfileType,
814 Version: tp.Version,
815 NumGemPorts: tp.NumGemPorts,
816 InstanceControl: tp.InstanceControl,
817 UsScheduler: &tp_pb.SchedulerAttributes{
818 AllocId: tcontIDs[0],
819 Direction: tp.UsScheduler.Direction,
820 AdditionalBw: tp.UsScheduler.AdditionalBw,
821 Priority: tp.UsScheduler.Priority,
822 Weight: tp.UsScheduler.Weight,
823 QSchedPolicy: tp.UsScheduler.QSchedPolicy},
824 DsScheduler: &tp_pb.SchedulerAttributes{
825 AllocId: tcontIDs[0],
826 Direction: tp.DsScheduler.Direction,
827 AdditionalBw: tp.DsScheduler.AdditionalBw,
828 Priority: tp.DsScheduler.Priority,
829 Weight: tp.DsScheduler.Weight,
830 QSchedPolicy: tp.DsScheduler.QSchedPolicy},
831 UpstreamGemPortAttributeList: usGemPortAttributeList,
832 DownstreamGemPortAttributeList: dsGemPortAttributeList}
833}
834
835// allocateTPInstance function for EPON
836func (t *TechProfileMgr) allocateEponTPInstance(ctx context.Context, uniPortName string, tp *tp_pb.EponTechProfile, intfID uint32, tpInstPath string) *tp_pb.EponTechProfileInstance {
837
838 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
839 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
840 var tcontIDs []uint32
841 var gemPorts []uint32
842 var err error
843
844 logger.Infow(ctx, "allocating-tp-instance-from-tp-template", log.Fields{"uniPortName": uniPortName, "intfID": intfID, "numGem": tp.NumGemPorts})
845
846 if tp.InstanceControl.Onu == "multi-instance" {
847 if tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
848 logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
849 return nil
850 }
851 } else { // "single-instance"
852 if tpInst := t.getSingleInstanceEponTp(ctx, tpInstPath); tpInst == nil {
853 // No "single-instance" tp found on one any uni port for the given TP ID
854 // Allocate a new TcontID or AllocID
855 if tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
856 logger.Errorw(ctx, "error-getting-alloc-id-from-resource-mgr", log.Fields{"err": err, "intfID": intfID})
857 return nil
858 }
859 } else {
860 // Use the alloc-id from the existing TpInstance
861 tcontIDs = append(tcontIDs, tpInst.AllocId)
862 }
863 }
864 logger.Debugw(ctx, "Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
865 if gemPorts, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts); err != nil {
866 logger.Errorw(ctx, "error-getting-gemport-id-from-resource-mgr", log.Fields{"err": err, "intfID": intfID, "numGemports": tp.NumGemPorts})
867 return nil
868 }
869 logger.Infow(ctx, "allocated-alloc-id-and-gemport-successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
870 for index := 0; index < int(tp.NumGemPorts); index++ {
871 usQueueAttributeList = append(usQueueAttributeList,
872 &tp_pb.EPONQueueAttributes{GemportId: gemPorts[index],
873 MaxQSize: tp.UpstreamQueueAttributeList[index].MaxQSize,
874 PbitMap: tp.UpstreamQueueAttributeList[index].PbitMap,
875 AesEncryption: tp.UpstreamQueueAttributeList[index].AesEncryption,
876 TrafficType: tp.UpstreamQueueAttributeList[index].TrafficType,
877 UnsolicitedGrantSize: tp.UpstreamQueueAttributeList[index].UnsolicitedGrantSize,
878 NominalInterval: tp.UpstreamQueueAttributeList[index].NominalInterval,
879 ToleratedPollJitter: tp.UpstreamQueueAttributeList[index].ToleratedPollJitter,
880 RequestTransmissionPolicy: tp.UpstreamQueueAttributeList[index].RequestTransmissionPolicy,
881 NumQSets: tp.UpstreamQueueAttributeList[index].NumQSets,
882 QThresholds: tp.UpstreamQueueAttributeList[index].QThresholds,
883 SchedulingPolicy: tp.UpstreamQueueAttributeList[index].SchedulingPolicy,
884 PriorityQ: tp.UpstreamQueueAttributeList[index].PriorityQ,
885 Weight: tp.UpstreamQueueAttributeList[index].Weight,
886 DiscardPolicy: tp.UpstreamQueueAttributeList[index].DiscardPolicy,
887 DiscardConfig: tp.UpstreamQueueAttributeList[index].DiscardConfig})
888 }
889
890 logger.Info(ctx, "length-of-downstream-gemport-attribute-list", len(tp.DownstreamQueueAttributeList))
891 for index := 0; index < int(tp.NumGemPorts); index++ {
892 dsQueueAttributeList = append(dsQueueAttributeList,
893 &tp_pb.EPONQueueAttributes{GemportId: gemPorts[index],
894 MaxQSize: tp.DownstreamQueueAttributeList[index].MaxQSize,
895 PbitMap: tp.DownstreamQueueAttributeList[index].PbitMap,
896 AesEncryption: tp.DownstreamQueueAttributeList[index].AesEncryption,
897 SchedulingPolicy: tp.DownstreamQueueAttributeList[index].SchedulingPolicy,
898 PriorityQ: tp.DownstreamQueueAttributeList[index].PriorityQ,
899 Weight: tp.DownstreamQueueAttributeList[index].Weight,
900 DiscardPolicy: tp.DownstreamQueueAttributeList[index].DiscardPolicy,
901 DiscardConfig: tp.DownstreamQueueAttributeList[index].DiscardConfig})
902 }
903
904 return &tp_pb.EponTechProfileInstance{
905 SubscriberIdentifier: uniPortName,
906 Name: tp.Name,
907 ProfileType: tp.ProfileType,
908 Version: tp.Version,
909 NumGemPorts: tp.NumGemPorts,
910 InstanceControl: tp.InstanceControl,
911 PackageType: tp.PackageType,
912 AllocId: tcontIDs[0],
913 UpstreamQueueAttributeList: usQueueAttributeList,
914 DownstreamQueueAttributeList: dsQueueAttributeList}
915}
916
917// getSingleInstanceTp returns another TpInstance (GPON, XGPON, XGS-PON) for an ONU on a different
918// uni port for the same TP ID, if it finds one, else nil.
919func (t *TechProfileMgr) getSingleInstanceTp(ctx context.Context, tpPathSuffix string) *tp_pb.TechProfileInstance {
920
921 // For example:
922 // tpPathSuffix like "XGS-PON/64/olt-{1234}/pon-{0}/onu-{1}/uni-{1}"
923 // is broken into ["XGS-PON/64/olt-{1234}/pon-{0}/onu-{1}" ""]
924 uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPathSuffix, 2)
925
926 t.tpInstanceMapLock.RLock()
927 defer t.tpInstanceMapLock.RUnlock()
928 for i := 0; i < MaxUniPortPerOnu; i++ {
929 key := fmt.Sprintf(uniPathSlice[0]+"/uni-{%d}", i)
930 if tpInst, ok := t.tpInstanceMap[key]; ok {
931 logger.Debugw(ctx, "found-single-instance-tp", log.Fields{"key": key})
932 return tpInst
933 }
934 }
935 return nil
936}
937
938// getSingleInstanceTp returns another TpInstance (EPON) for an ONU on a different
939// uni port for the same TP ID, if it finds one, else nil.
940func (t *TechProfileMgr) getSingleInstanceEponTp(ctx context.Context, tpPathSuffix string) *tp_pb.EponTechProfileInstance {
941 // For example:
942 // tpPathSuffix like "EPON/64/olt-{1234}/pon-{0}/onu-{1}/uni-{1}"
943 // is broken into ["EPON/64/-{1234}/pon-{0}/onu-{1}" ""]
944 uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPathSuffix, 2)
945
946 t.epontpInstanceMapLock.RLock()
947 defer t.epontpInstanceMapLock.RUnlock()
948 for i := 0; i < MaxUniPortPerOnu; i++ {
949 key := fmt.Sprintf(uniPathSlice[0]+"/uni-{%d}", i)
950 if tpInst, ok := t.eponTpInstanceMap[key]; ok {
951 logger.Debugw(ctx, "found-single-instance-tp", log.Fields{"key": key})
952 return tpInst
953 }
954 }
955 return nil
956}
957
958// getDefaultTechProfile returns a default TechProfile for GPON, XGPON, XGS-PON
959func (t *TechProfileMgr) getDefaultTechProfile(ctx context.Context) *tp_pb.TechProfile {
960 var usGemPortAttributeList []*tp_pb.GemPortAttributes
961 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
962
963 for _, pbit := range t.config.DefaultPbits {
964 logger.Debugw(ctx, "creating-gem-port-profile-profile", log.Fields{"pbit": pbit})
965 usGemPortAttributeList = append(usGemPortAttributeList,
966 &tp_pb.GemPortAttributes{
967 MaxQSize: defaultMaxQueueSize,
968 PbitMap: pbit,
969 AesEncryption: defaultAESEncryption,
970 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
971 PriorityQ: defaultPriorityQueue,
972 Weight: defaultQueueWeight,
973 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
974 DiscardConfigV2: &tp_pb.DiscardConfig{
975 DiscardPolicy: tp_pb.DiscardPolicy_Red,
976 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
977 RedDiscardConfig: &tp_pb.RedDiscardConfig{
978 MinThreshold: defaultMinThreshold,
979 MaxThreshold: defaultMaxThreshold,
980 MaxProbability: defaultMaxProbability,
981 },
982 },
983 },
984 DiscardConfig: &tp_pb.RedDiscardConfig{
985 MinThreshold: defaultMinThreshold,
986 MaxThreshold: defaultMaxThreshold,
987 MaxProbability: defaultMaxProbability,
988 },
989 })
990 dsGemPortAttributeList = append(dsGemPortAttributeList,
991 &tp_pb.GemPortAttributes{
992 MaxQSize: defaultMaxQueueSize,
993 PbitMap: pbit,
994 AesEncryption: defaultAESEncryption,
995 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
996 PriorityQ: defaultPriorityQueue,
997 Weight: defaultQueueWeight,
998 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
999 DiscardConfigV2: &tp_pb.DiscardConfig{
1000 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1001 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1002 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1003 MinThreshold: defaultMinThreshold,
1004 MaxThreshold: defaultMaxThreshold,
1005 MaxProbability: defaultMaxProbability,
1006 },
1007 },
1008 },
1009 DiscardConfig: &tp_pb.RedDiscardConfig{
1010 MinThreshold: defaultMinThreshold,
1011 MaxThreshold: defaultMaxThreshold,
1012 MaxProbability: defaultMaxProbability,
1013 },
1014 IsMulticast: defaultIsMulticast,
1015 DynamicAccessControlList: defaultAccessControlList,
1016 StaticAccessControlList: defaultAccessControlList,
1017 MulticastGemId: defaultMcastGemID})
1018 }
1019 return &tp_pb.TechProfile{
1020 Name: t.config.DefaultTPName,
1021 ProfileType: t.resourceMgr.GetTechnology(),
1022 Version: t.config.TPVersion,
1023 NumGemPorts: uint32(len(usGemPortAttributeList)),
1024 InstanceControl: &tp_pb.InstanceControl{
1025 Onu: defaultOnuInstance,
1026 Uni: defaultUniInstance,
1027 MaxGemPayloadSize: defaultGemPayloadSize},
1028 UsScheduler: &tp_pb.SchedulerAttributes{
1029 Direction: tp_pb.Direction_UPSTREAM,
1030 AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
1031 Priority: defaultPriority,
1032 Weight: defaultWeight,
1033 QSchedPolicy: tp_pb.SchedulingPolicy_Hybrid},
1034 DsScheduler: &tp_pb.SchedulerAttributes{
1035 Direction: tp_pb.Direction_DOWNSTREAM,
1036 AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
1037 Priority: defaultPriority,
1038 Weight: defaultWeight,
1039 QSchedPolicy: tp_pb.SchedulingPolicy_Hybrid},
1040 UpstreamGemPortAttributeList: usGemPortAttributeList,
1041 DownstreamGemPortAttributeList: dsGemPortAttributeList}
1042}
1043
1044// getDefaultEponProfile returns a default TechProfile for EPON
1045func (t *TechProfileMgr) getDefaultEponProfile(ctx context.Context) *tp_pb.EponTechProfile {
1046
1047 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
1048 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
1049
1050 for _, pbit := range t.config.DefaultPbits {
1051 logger.Debugw(ctx, "Creating Queue", log.Fields{"pbit": pbit})
1052 usQueueAttributeList = append(usQueueAttributeList,
1053 &tp_pb.EPONQueueAttributes{
1054 MaxQSize: defaultMaxQueueSize,
1055 PbitMap: pbit,
1056 AesEncryption: defaultAESEncryption,
1057 TrafficType: defaultTrafficType,
1058 UnsolicitedGrantSize: defaultUnsolicitedGrantSize,
1059 NominalInterval: defaultNominalInterval,
1060 ToleratedPollJitter: defaultToleratedPollJitter,
1061 RequestTransmissionPolicy: defaultRequestTransmissionPolicy,
1062 NumQSets: defaultNumQueueSet,
1063 QThresholds: &tp_pb.QThresholds{
1064 QThreshold1: defaultQThreshold1,
1065 QThreshold2: defaultQThreshold2,
1066 QThreshold3: defaultQThreshold3,
1067 QThreshold4: defaultQThreshold4,
1068 QThreshold5: defaultQThreshold5,
1069 QThreshold6: defaultQThreshold6,
1070 QThreshold7: defaultQThreshold7},
1071 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
1072 PriorityQ: defaultPriorityQueue,
1073 Weight: defaultQueueWeight,
1074 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
1075 DiscardConfigV2: &tp_pb.DiscardConfig{
1076 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1077 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1078 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1079 MinThreshold: defaultMinThreshold,
1080 MaxThreshold: defaultMaxThreshold,
1081 MaxProbability: defaultMaxProbability,
1082 },
1083 },
1084 },
1085 DiscardConfig: &tp_pb.RedDiscardConfig{
1086 MinThreshold: defaultMinThreshold,
1087 MaxThreshold: defaultMaxThreshold,
1088 MaxProbability: defaultMaxProbability,
1089 }})
1090 dsQueueAttributeList = append(dsQueueAttributeList,
1091 &tp_pb.EPONQueueAttributes{
1092 MaxQSize: defaultMaxQueueSize,
1093 PbitMap: pbit,
1094 AesEncryption: defaultAESEncryption,
1095 SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
1096 PriorityQ: defaultPriorityQueue,
1097 Weight: defaultQueueWeight,
1098 DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
1099 DiscardConfigV2: &tp_pb.DiscardConfig{
1100 DiscardPolicy: tp_pb.DiscardPolicy_Red,
1101 DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
1102 RedDiscardConfig: &tp_pb.RedDiscardConfig{
1103 MinThreshold: defaultMinThreshold,
1104 MaxThreshold: defaultMaxThreshold,
1105 MaxProbability: defaultMaxProbability,
1106 },
1107 },
1108 },
1109 DiscardConfig: &tp_pb.RedDiscardConfig{
1110 MinThreshold: defaultMinThreshold,
1111 MaxThreshold: defaultMaxThreshold,
1112 MaxProbability: defaultMaxProbability,
1113 }})
1114 }
1115 return &tp_pb.EponTechProfile{
1116 Name: t.config.DefaultTPName,
1117 ProfileType: t.resourceMgr.GetTechnology(),
1118 Version: t.config.TPVersion,
1119 NumGemPorts: uint32(len(usQueueAttributeList)),
1120 InstanceControl: &tp_pb.InstanceControl{
1121 Onu: defaultOnuInstance,
1122 Uni: defaultUniInstance,
1123 MaxGemPayloadSize: defaultGemPayloadSize},
1124 PackageType: defaultPakageType,
1125 UpstreamQueueAttributeList: usQueueAttributeList,
1126 DownstreamQueueAttributeList: dsQueueAttributeList}
1127}
1128
1129//isMulticastGem returns true if isMulticast attribute value of a GEM port is true; false otherwise
1130func isMulticastGem(isMulticastAttrValue string) bool {
1131 return isMulticastAttrValue != "" &&
1132 (isMulticastAttrValue == "True" || isMulticastAttrValue == "true" || isMulticastAttrValue == "TRUE")
1133}
1134
1135func (t *TechProfileMgr) addResourceInstanceToKVStore(ctx context.Context, tpID uint32, uniPortName string, resInst tp_pb.ResourceInstance) error {
1136 logger.Debugw(ctx, "adding-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
1137 val, err := proto.Marshal(&resInst)
1138 if err != nil {
1139 logger.Errorw(ctx, "failed-to-marshall-resource-instance", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
1140 return err
1141 }
1142 err = t.config.ResourceInstanceKVBacked.Put(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName), val)
1143 return err
1144}
1145
1146func (t *TechProfileMgr) removeResourceInstanceFromKVStore(ctx context.Context, tpID uint32, uniPortName string) error {
1147 logger.Debugw(ctx, "removing-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName})
1148 if err := t.config.ResourceInstanceKVBacked.Delete(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName)); err != nil {
1149 logger.Errorw(ctx, "error-removing-resource-instance-to-kv-store", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
1150 return err
1151 }
1152 return nil
1153}
1154
1155func (t *TechProfileMgr) getTPFromKVStore(ctx context.Context, tpID uint32) *tp_pb.TechProfile {
1156 var tp *tp_pb.TechProfile
1157 t.tpMapLock.RLock()
1158 tp, ok := t.tpMap[tpID]
1159 t.tpMapLock.RUnlock()
1160 if ok {
1161 logger.Debugw(ctx, "found-tp-in-cache", log.Fields{"tpID": tpID})
1162 return tp
1163 }
1164 key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), tpID)
1165 logger.Debugw(ctx, "getting-tp-from-kv-store", log.Fields{"tpID": tpID, "Key": key})
1166 kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
1167 if err != nil {
1168 logger.Errorw(ctx, "error-fetching-from-kv-store", log.Fields{"err": err, "key": key})
1169 return nil
1170 }
1171 if kvresult != nil {
1172 /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
1173 if value, err := kvstore.ToByte(kvresult.Value); err == nil {
1174 lTp := &tp_pb.TechProfile{}
1175 reader := bytes.NewReader(value)
1176 if err = jsonpb.Unmarshal(reader, lTp); err != nil {
1177 logger.Errorw(ctx, "error-unmarshalling-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
1178 return nil
1179 }
1180
1181 logger.Debugw(ctx, "success-fetched-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lTp})
1182 return lTp
1183 } else {
1184 logger.Errorw(ctx, "error-decoding-tp", log.Fields{"err": err, "tpID": tpID})
1185 // We we create a default profile in this case.
1186 }
1187 }
1188
1189 return nil
1190}
1191
1192func (t *TechProfileMgr) getEponTPFromKVStore(ctx context.Context, tpID uint32) *tp_pb.EponTechProfile {
1193 var eponTp *tp_pb.EponTechProfile
1194 t.eponTpMapLock.RLock()
1195 eponTp, ok := t.eponTpMap[tpID]
1196 t.eponTpMapLock.RUnlock()
1197 if ok {
1198 logger.Debugw(ctx, "found-tp-in-cache", log.Fields{"tpID": tpID})
1199 return eponTp
1200 }
1201 key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), tpID)
1202 logger.Debugw(ctx, "getting-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "Key": key})
1203 kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
1204 if err != nil {
1205 logger.Errorw(ctx, "error-fetching-from-kv-store", log.Fields{"err": err, "key": key})
1206 return nil
1207 }
1208 if kvresult != nil {
1209 /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
1210 if value, err := kvstore.ToByte(kvresult.Value); err == nil {
1211 lEponTp := &tp_pb.EponTechProfile{}
1212 reader := bytes.NewReader(value)
1213 if err = jsonpb.Unmarshal(reader, lEponTp); err != nil {
1214 logger.Errorw(ctx, "error-unmarshalling-epon-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
1215 return nil
1216 }
1217
1218 logger.Debugw(ctx, "success-fetching-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lEponTp})
1219 return lEponTp
1220 }
1221 }
1222 return nil
1223}
1224
1225func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
1226
1227 logger.Infow(ctx, "kv-store", log.Fields{"storeType": storeType, "address": address})
1228 switch storeType {
1229 case "etcd":
1230 return kvstore.NewEtcdClient(ctx, address, timeout, log.WarnLevel)
1231 }
1232 return nil, errors.New("unsupported-kv-store")
1233}
1234
1235// buildTpInstanceFromResourceInstance for GPON, XGPON and XGS-PON technology - build TpInstance from TechProfile template and ResourceInstance
1236func (t *TechProfileMgr) buildTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.TechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
1237
1238 var usGemPortAttributeList []*tp_pb.GemPortAttributes
1239 var dsGemPortAttributeList []*tp_pb.GemPortAttributes
1240 var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
1241 var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
1242
1243 if len(resInst.GemportIds) != int(tp.NumGemPorts) {
1244 logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-template-and-resource-instance",
1245 log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
1246 return nil
1247 }
1248 for index := 0; index < int(tp.NumGemPorts); index++ {
1249 usGemPortAttributeList = append(usGemPortAttributeList,
1250 &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
1251 MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
1252 PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
1253 AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
1254 SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
1255 PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
1256 Weight: tp.UpstreamGemPortAttributeList[index].Weight,
1257 DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
1258 DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
1259 }
1260
1261 //put multicast and unicast downstream GEM port attributes in different lists first
1262 for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
1263 if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
1264 dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
1265 &tp_pb.GemPortAttributes{
1266 MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
1267 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
1268 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
1269 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
1270 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
1271 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
1272 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
1273 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
1274 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
1275 IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
1276 DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
1277 StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
1278 } else {
1279 dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
1280 &tp_pb.GemPortAttributes{
1281 MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
1282 PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
1283 AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
1284 SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
1285 PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
1286 Weight: tp.DownstreamGemPortAttributeList[index].Weight,
1287 DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
1288 DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
1289 }
1290 }
1291 //add unicast downstream GEM ports to dsGemPortAttributeList
1292 if dsUnicastGemAttributeList != nil {
1293 for index := 0; index < int(tp.NumGemPorts); index++ {
1294 dsGemPortAttributeList = append(dsGemPortAttributeList,
1295 &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
1296 MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
1297 PbitMap: dsUnicastGemAttributeList[index].PbitMap,
1298 AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
1299 SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
1300 PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
1301 Weight: dsUnicastGemAttributeList[index].Weight,
1302 DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
1303 DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
1304 }
1305 }
1306 //add multicast GEM ports to dsGemPortAttributeList afterwards
1307 for k := range dsMulticastGemAttributeList {
1308 dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
1309 }
1310
1311 return &tp_pb.TechProfileInstance{
1312 SubscriberIdentifier: resInst.SubscriberIdentifier,
1313 Name: tp.Name,
1314 ProfileType: tp.ProfileType,
1315 Version: tp.Version,
1316 NumGemPorts: tp.NumGemPorts,
1317 InstanceControl: tp.InstanceControl,
1318 UsScheduler: &tp_pb.SchedulerAttributes{
1319 AllocId: resInst.AllocId,
1320 Direction: tp.UsScheduler.Direction,
1321 AdditionalBw: tp.UsScheduler.AdditionalBw,
1322 Priority: tp.UsScheduler.Priority,
1323 Weight: tp.UsScheduler.Weight,
1324 QSchedPolicy: tp.UsScheduler.QSchedPolicy},
1325 DsScheduler: &tp_pb.SchedulerAttributes{
1326 AllocId: resInst.AllocId,
1327 Direction: tp.DsScheduler.Direction,
1328 AdditionalBw: tp.DsScheduler.AdditionalBw,
1329 Priority: tp.DsScheduler.Priority,
1330 Weight: tp.DsScheduler.Weight,
1331 QSchedPolicy: tp.DsScheduler.QSchedPolicy},
1332 UpstreamGemPortAttributeList: usGemPortAttributeList,
1333 DownstreamGemPortAttributeList: dsGemPortAttributeList}
1334}
1335
1336// buildEponTpInstanceFromResourceInstance for EPON technology - build EponTpInstance from EponTechProfile template and ResourceInstance
1337func (t *TechProfileMgr) buildEponTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.EponTechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.EponTechProfileInstance {
1338
1339 var usQueueAttributeList []*tp_pb.EPONQueueAttributes
1340 var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
1341
1342 if len(resInst.GemportIds) != int(tp.NumGemPorts) {
1343 logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-epon-tp-template-and-resource-instance",
1344 log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
1345 return nil
1346 }
1347
1348 for index := 0; index < int(tp.NumGemPorts); index++ {
1349 usQueueAttributeList = append(usQueueAttributeList,
1350 &tp_pb.EPONQueueAttributes{GemportId: resInst.GemportIds[index],
1351 MaxQSize: tp.UpstreamQueueAttributeList[index].MaxQSize,
1352 PbitMap: tp.UpstreamQueueAttributeList[index].PbitMap,
1353 AesEncryption: tp.UpstreamQueueAttributeList[index].AesEncryption,
1354 TrafficType: tp.UpstreamQueueAttributeList[index].TrafficType,
1355 UnsolicitedGrantSize: tp.UpstreamQueueAttributeList[index].UnsolicitedGrantSize,
1356 NominalInterval: tp.UpstreamQueueAttributeList[index].NominalInterval,
1357 ToleratedPollJitter: tp.UpstreamQueueAttributeList[index].ToleratedPollJitter,
1358 RequestTransmissionPolicy: tp.UpstreamQueueAttributeList[index].RequestTransmissionPolicy,
1359 NumQSets: tp.UpstreamQueueAttributeList[index].NumQSets,
1360 QThresholds: tp.UpstreamQueueAttributeList[index].QThresholds,
1361 SchedulingPolicy: tp.UpstreamQueueAttributeList[index].SchedulingPolicy,
1362 PriorityQ: tp.UpstreamQueueAttributeList[index].PriorityQ,
1363 Weight: tp.UpstreamQueueAttributeList[index].Weight,
1364 DiscardPolicy: tp.UpstreamQueueAttributeList[index].DiscardPolicy,
1365 DiscardConfig: tp.UpstreamQueueAttributeList[index].DiscardConfig})
1366 }
1367
1368 for index := 0; index < int(tp.NumGemPorts); index++ {
1369 dsQueueAttributeList = append(dsQueueAttributeList,
1370 &tp_pb.EPONQueueAttributes{GemportId: resInst.GemportIds[index],
1371 MaxQSize: tp.DownstreamQueueAttributeList[index].MaxQSize,
1372 PbitMap: tp.DownstreamQueueAttributeList[index].PbitMap,
1373 AesEncryption: tp.DownstreamQueueAttributeList[index].AesEncryption,
1374 SchedulingPolicy: tp.DownstreamQueueAttributeList[index].SchedulingPolicy,
1375 PriorityQ: tp.DownstreamQueueAttributeList[index].PriorityQ,
1376 Weight: tp.DownstreamQueueAttributeList[index].Weight,
1377 DiscardPolicy: tp.DownstreamQueueAttributeList[index].DiscardPolicy,
1378 DiscardConfig: tp.DownstreamQueueAttributeList[index].DiscardConfig})
1379 }
1380
1381 return &tp_pb.EponTechProfileInstance{
1382 SubscriberIdentifier: resInst.SubscriberIdentifier,
1383 Name: tp.Name,
1384 ProfileType: tp.ProfileType,
1385 Version: tp.Version,
1386 NumGemPorts: tp.NumGemPorts,
1387 InstanceControl: tp.InstanceControl,
1388 PackageType: tp.PackageType,
1389 AllocId: resInst.AllocId,
1390 UpstreamQueueAttributeList: usQueueAttributeList,
1391 DownstreamQueueAttributeList: dsQueueAttributeList}
1392}
1393
1394func (t *TechProfileMgr) getTpInstanceFromResourceInstance(ctx context.Context, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
1395 if resInst == nil {
1396 logger.Error(ctx, "resource-instance-nil")
1397 return nil
1398 }
1399 tp := t.getTPFromKVStore(ctx, resInst.TpId)
1400 if tp == nil {
1401 logger.Warnw(ctx, "tp-not-found-on-kv--creating-default-tp", log.Fields{"tpID": resInst.TpId})
1402 tp = t.getDefaultTechProfile(ctx)
1403 }
1404 return t.buildTpInstanceFromResourceInstance(ctx, tp, resInst)
1405}
1406
1407func (t *TechProfileMgr) getEponTpInstanceFromResourceInstance(ctx context.Context, resInst *tp_pb.ResourceInstance) *tp_pb.EponTechProfileInstance {
1408 if resInst == nil {
1409 logger.Error(ctx, "resource-instance-nil")
1410 return nil
1411 }
1412 eponTp := t.getEponTPFromKVStore(ctx, resInst.TpId)
1413 if eponTp == nil {
1414 logger.Warnw(ctx, "tp-not-found-on-kv--creating-default-tp", log.Fields{"tpID": resInst.TpId})
1415 eponTp = t.getDefaultEponProfile(ctx)
1416 }
1417 return t.buildEponTpInstanceFromResourceInstance(ctx, eponTp, resInst)
1418}
1419
1420func (t *TechProfileMgr) reconcileTpInstancesToCache(ctx context.Context) error {
1421
1422 tech := t.resourceMgr.GetTechnology()
1423 newCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
1424 defer cancel()
1425 kvPairs, _ := t.config.ResourceInstanceKVBacked.List(newCtx, tech)
1426
1427 if tech == xgspon || tech == xgpon || tech == gpon {
1428 for keyPath, kvPair := range kvPairs {
1429 logger.Debugw(ctx, "attempting-to-reconcile-tp-instance-from-resource-instance", log.Fields{"resourceInstPath": keyPath})
1430 if value, err := kvstore.ToByte(kvPair.Value); err == nil {
1431 var resInst tp_pb.ResourceInstance
1432 if err = proto.Unmarshal(value, &resInst); err != nil {
1433 logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"err": err, "keyPath": keyPath, "value": value})
1434 continue
1435 } else {
1436 if tpInst := t.getTpInstanceFromResourceInstance(ctx, &resInst); tpInst != nil {
1437 // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
1438 keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
1439 if len(keySuffixSlice) == 2 {
1440 keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
1441 // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
1442 if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
1443 logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
1444 continue
1445 }
1446 } else {
1447 logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
1448 continue
1449 }
1450 t.tpInstanceMapLock.Lock()
1451 t.tpInstanceMap[keySuffixSlice[1]] = tpInst
1452 t.tpInstanceMapLock.Unlock()
1453 logger.Debugw(ctx, "reconciled-tp-success", log.Fields{"keyPath": keyPath})
1454 }
1455 }
1456 } else {
1457 logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
1458 }
1459 }
1460 } else if tech == epon {
1461 for keyPath, kvPair := range kvPairs {
1462 logger.Debugw(ctx, "attempting-to-reconcile-epon-tp-instance", log.Fields{"keyPath": keyPath})
1463 if value, err := kvstore.ToByte(kvPair.Value); err == nil {
1464 var resInst tp_pb.ResourceInstance
1465 if err = proto.Unmarshal(value, &resInst); err != nil {
1466 logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"keyPath": keyPath, "value": value})
1467 continue
1468 } else {
1469 if eponTpInst := t.getEponTpInstanceFromResourceInstance(ctx, &resInst); eponTpInst != nil {
1470 // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
1471 keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
1472 if len(keySuffixSlice) == 2 {
1473 keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
1474 // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
1475 if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
1476 logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
1477 continue
1478 }
1479 } else {
1480 logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
1481 continue
1482 }
1483 t.epontpInstanceMapLock.Lock()
1484 t.eponTpInstanceMap[keySuffixSlice[1]] = eponTpInst
1485 t.epontpInstanceMapLock.Unlock()
1486 logger.Debugw(ctx, "reconciled-epon-tp-success", log.Fields{"keyPath": keyPath})
1487 }
1488 }
1489 } else {
1490 logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
1491 }
1492 }
1493 } else {
1494 logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
1495 return fmt.Errorf("unknown-tech-%v", tech)
1496 }
1497
1498 return nil
1499}