blob: 389e09e8fc650494511c3253f413b1fe7efea6db [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package resourcemanager provides the utility for managing resources
18package resourcemanager
19
20import (
21 "context"
22 "encoding/json"
23 "errors"
24 "fmt"
25 "strconv"
26 "sync"
27 "time"
28
29 "github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
30
31 "github.com/opencord/voltha-lib-go/v3/pkg/db"
32 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
33 "github.com/opencord/voltha-lib-go/v3/pkg/log"
34 ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
35 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
36 "github.com/opencord/voltha-protos/v3/go/openolt"
37)
38
39const (
40 KvstoreTimeout = 5 * time.Second
41 BasePathKvStore = "service/voltha/openolt/{%s}"
42 TpIDPathSuffix = "{%d,%d,%d}/tp_id"
43 MeterIDPathSuffix = "{%d,%d,%d}/{%d}/meter_id/{%s}"
44 NnniIntfID = "nniintfids"
45 OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d"
46 OnuPacketINPath = OnuPacketINPathPrefix + ",%d,%d}"
47 FlowIDsForGem = "flowids_per_gem/{%d}"
48 McastQueuesForIntf = "mcast_qs_for_int"
49 FlowGroup = "flow_groups/{%d}"
50 FlowGroupCached = "flow_groups_cached/{%d}"
51)
52
53// FlowInfo holds the flow information
54type FlowInfo struct {
55 Flow *openolt.Flow
56 FlowStoreCookie uint64
57 FlowCategory string
58 LogicalFlowID uint64
59}
60
61// OnuGemInfo holds onu information along with gem port list and uni port list
62type OnuGemInfo struct {
63 OnuID uint32
64 SerialNumber string
65 IntfID uint32
66 GemPorts []uint32
67 UniPorts []uint32
68}
69
70// PacketInInfoKey is the key for packet in gemport
71type PacketInInfoKey struct {
72 IntfID uint32
73 OnuID uint32
74 LogicalPort uint32
75 VlanID uint16
76 Priority uint8
77}
78
79// GroupInfo holds group information
80type GroupInfo struct {
81 GroupID uint32
82 OutPorts []uint32
83}
84
85// OpenOltResourceMgr holds resource related information as provided below for each field
86type OpenOltResourceMgr struct {
87 DeviceID string // OLT device id
88 Address string // Host and port of the kv store to connect to
89 Args string // args
90 KVStore *db.Backend // backend kv store connection handle
91 DeviceType string
92 DevInfo *openolt.DeviceInfo // device information
93 ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
94
95 GemPortIDMgmtLock []sync.RWMutex
96 AllocIDMgmtLock []sync.RWMutex
97 OnuIDMgmtLock []sync.RWMutex
98 FlowIDMgmtLock sync.RWMutex
99
100 flowIDToGemInfoLock sync.RWMutex
101}
102
103func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
104 logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
105 switch storeType {
106 case "consul":
107 return kvstore.NewConsulClient(ctx, address, timeout)
108 case "etcd":
109 return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
110 }
111 return nil, errors.New("unsupported-kv-store")
112}
113
114// SetKVClient sets the KV client and return a kv backend
115func SetKVClient(ctx context.Context, backend string, addr string, DeviceID string) *db.Backend {
116 kvClient, err := newKVClient(ctx, backend, addr, KvstoreTimeout)
117 if err != nil {
118 logger.Fatalw(ctx, "Failed to init KV client\n", log.Fields{"err": err})
119 return nil
120 }
121
122 kvbackend := &db.Backend{
123 Client: kvClient,
124 StoreType: backend,
125 Address: addr,
126 Timeout: KvstoreTimeout,
127 PathPrefix: fmt.Sprintf(BasePathKvStore, DeviceID)}
128
129 return kvbackend
130}
131
132// NewResourceMgr init a New resource manager instance which in turn instantiates pon resource manager
133// instances according to technology. Initializes the default resource ranges for all
134// the resources.
135func NewResourceMgr(ctx context.Context, deviceID string, KVStoreAddress string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo) *OpenOltResourceMgr {
136 var ResourceMgr OpenOltResourceMgr
137 logger.Debugf(ctx, "Init new resource manager , address: %s, device-id: %s", KVStoreAddress, deviceID)
138 ResourceMgr.Address = KVStoreAddress
139 ResourceMgr.DeviceType = deviceType
140 ResourceMgr.DevInfo = devInfo
141 NumPONPorts := devInfo.GetPonPorts()
142
143 Backend := kvStoreType
144 ResourceMgr.KVStore = SetKVClient(ctx, Backend, ResourceMgr.Address, deviceID)
145 if ResourceMgr.KVStore == nil {
146 logger.Error(ctx, "Failed to setup KV store")
147 }
148 Ranges := make(map[string]*openolt.DeviceInfo_DeviceResourceRanges)
149 RsrcMgrsByTech := make(map[string]*ponrmgr.PONResourceManager)
150 ResourceMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
151
152 ResourceMgr.AllocIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
153 ResourceMgr.GemPortIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
154 ResourceMgr.OnuIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
155
156
157 /*
158 If a legacy driver returns protobuf without any ranges,s synthesize one from
159 the legacy global per-device information. This, in theory, is temporary until
160 the legacy drivers are upgrade to support pool ranges.
161 */
162 if devInfo.Ranges == nil {
163 var ranges openolt.DeviceInfo_DeviceResourceRanges
164 ranges.Technology = devInfo.GetTechnology()
165
166 var index uint32
167 for index = 0; index < NumPONPorts; index++ {
168 ranges.IntfIds = append(ranges.IntfIds, index)
169 }
170
171 var Pool openolt.DeviceInfo_DeviceResourceRanges_Pool
172 Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_ONU_ID
173 Pool.Start = devInfo.OnuIdStart
174 Pool.End = devInfo.OnuIdEnd
175 Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF
176 onuPool := Pool
177 ranges.Pools = append(ranges.Pools, &onuPool)
178
179 Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_ALLOC_ID
180 Pool.Start = devInfo.AllocIdStart
181 Pool.End = devInfo.AllocIdEnd
182 Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
183 allocPool := Pool
184 ranges.Pools = append(ranges.Pools, &allocPool)
185
186 Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_GEMPORT_ID
187 Pool.Start = devInfo.GemportIdStart
188 Pool.End = devInfo.GemportIdEnd
189 Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
190 gemPool := Pool
191 ranges.Pools = append(ranges.Pools, &gemPool)
192
193 Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_FLOW_ID
194 Pool.Start = devInfo.FlowIdStart
195 Pool.End = devInfo.FlowIdEnd
196 Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
197 ranges.Pools = append(ranges.Pools, &Pool)
198 // Add to device info
199 devInfo.Ranges = append(devInfo.Ranges, &ranges)
200 }
201
202 var GlobalPONRsrcMgr *ponrmgr.PONResourceManager
203 var err error
204 for _, TechRange := range devInfo.Ranges {
205 technology := TechRange.Technology
206 logger.Debugf(ctx, "Device info technology %s", technology)
207 Ranges[technology] = TechRange
208
209 RsrcMgrsByTech[technology], err = ponrmgr.NewPONResourceManager(ctx, technology, deviceType, deviceID,
210 Backend, ResourceMgr.Address)
211 if err != nil {
212 logger.Errorf(ctx, "Failed to create pon resource manager instance for technology %s", technology)
213 return nil
214 }
215 // resource_mgrs_by_tech[technology] = resource_mgr
216 if GlobalPONRsrcMgr == nil {
217 GlobalPONRsrcMgr = RsrcMgrsByTech[technology]
218 }
219 for _, IntfID := range TechRange.IntfIds {
220 ResourceMgr.ResourceMgrs[uint32(IntfID)] = RsrcMgrsByTech[technology]
221 }
222 // self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
223 InitializeDeviceResourceRangeAndPool(ctx, RsrcMgrsByTech[technology], GlobalPONRsrcMgr,
224 TechRange, devInfo)
225 }
226 for _, PONRMgr := range RsrcMgrsByTech {
227 _ = PONRMgr.InitDeviceResourcePool(ctx)
228 }
229 logger.Info(ctx, "Initialization of resource manager success!")
230 return &ResourceMgr
231}
232
233// InitializeDeviceResourceRangeAndPool initializes the resource range pool according to the sharing type, then apply
234// device specific information. If KV doesn't exist
235// or is broader than the device, the device's information will
236// dictate the range limits
237func InitializeDeviceResourceRangeAndPool(ctx context.Context, ponRMgr *ponrmgr.PONResourceManager, globalPONRMgr *ponrmgr.PONResourceManager,
238 techRange *openolt.DeviceInfo_DeviceResourceRanges, devInfo *openolt.DeviceInfo) {
239
240
241 logger.Debugf(ctx, "Resource range pool init for technology %s", ponRMgr.Technology)
242 status := ponRMgr.InitResourceRangesFromKVStore(ctx)
243 if !status {
244 logger.Debugf(ctx, "Failed to load resource ranges from KV store for tech %s", ponRMgr.Technology)
245 }
246
247 /*
248 Then apply device specific information. If KV doesn't exist
249 or is broader than the device, the device's information will
250 dictate the range limits
251 */
252 logger.Debugw(ctx, "Using device info to init pon resource ranges", log.Fields{"Tech": ponRMgr.Technology})
253
254 ONUIDStart := devInfo.OnuIdStart
255 ONUIDEnd := devInfo.OnuIdEnd
256 ONUIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF
257 ONUIDSharedPoolID := uint32(0)
258 AllocIDStart := devInfo.AllocIdStart
259 AllocIDEnd := devInfo.AllocIdEnd
260 AllocIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
261 AllocIDSharedPoolID := uint32(0)
262 GEMPortIDStart := devInfo.GemportIdStart
263 GEMPortIDEnd := devInfo.GemportIdEnd
264 GEMPortIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
265 GEMPortIDSharedPoolID := uint32(0)
266 FlowIDStart := devInfo.FlowIdStart
267 FlowIDEnd := devInfo.FlowIdEnd
268 FlowIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
269 FlowIDSharedPoolID := uint32(0)
270
271 var FirstIntfPoolID uint32
272 var SharedPoolID uint32
273
274 /*
275 * As a zero check is made against SharedPoolID to check whether the resources are shared across all intfs
276 * if resources are shared across interfaces then SharedPoolID is given a positive number.
277 */
278 for _, FirstIntfPoolID = range techRange.IntfIds {
279 // skip the intf id 0
280 if FirstIntfPoolID == 0 {
281 continue
282 }
283 break
284 }
285
286 for _, RangePool := range techRange.Pools {
287 if RangePool.Sharing == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
288 SharedPoolID = FirstIntfPoolID
289 } else if RangePool.Sharing == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_SAME_TECH {
290 SharedPoolID = FirstIntfPoolID
291 } else {
292 SharedPoolID = 0
293 }
294 if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_ONU_ID {
295 ONUIDStart = RangePool.Start
296 ONUIDEnd = RangePool.End
297 ONUIDShared = RangePool.Sharing
298 ONUIDSharedPoolID = SharedPoolID
299 } else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_ALLOC_ID {
300 AllocIDStart = RangePool.Start
301 AllocIDEnd = RangePool.End
302 AllocIDShared = RangePool.Sharing
303 AllocIDSharedPoolID = SharedPoolID
304 } else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_GEMPORT_ID {
305 GEMPortIDStart = RangePool.Start
306 GEMPortIDEnd = RangePool.End
307 GEMPortIDShared = RangePool.Sharing
308 GEMPortIDSharedPoolID = SharedPoolID
309 } else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_FLOW_ID {
310 FlowIDStart = RangePool.Start
311 FlowIDEnd = RangePool.End
312 FlowIDShared = RangePool.Sharing
313 FlowIDSharedPoolID = SharedPoolID
314 }
315 }
316
317 logger.Debugw(ctx, "Device info init", log.Fields{"technology": techRange.Technology,
318 "onu_id_start": ONUIDStart, "onu_id_end": ONUIDEnd, "onu_id_shared_pool_id": ONUIDSharedPoolID,
319 "alloc_id_start": AllocIDStart, "alloc_id_end": AllocIDEnd,
320 "alloc_id_shared_pool_id": AllocIDSharedPoolID,
321 "gemport_id_start": GEMPortIDStart, "gemport_id_end": GEMPortIDEnd,
322 "gemport_id_shared_pool_id": GEMPortIDSharedPoolID,
323 "flow_id_start": FlowIDStart,
324 "flow_id_end_idx": FlowIDEnd,
325 "flow_id_shared_pool_id": FlowIDSharedPoolID,
326 "intf_ids": techRange.IntfIds,
327 "uni_id_start": 0,
328 "uni_id_end_idx": 1, /*MaxUNIIDperONU()*/
329 })
330
331 ponRMgr.InitDefaultPONResourceRanges(ctx, ONUIDStart, ONUIDEnd, ONUIDSharedPoolID,
332 AllocIDStart, AllocIDEnd, AllocIDSharedPoolID,
333 GEMPortIDStart, GEMPortIDEnd, GEMPortIDSharedPoolID,
334 FlowIDStart, FlowIDEnd, FlowIDSharedPoolID, 0, 1,
335 devInfo.PonPorts, techRange.IntfIds)
336
337
338 if ONUIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
339 globalPONRMgr.UpdateRanges(ctx, ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
340 "", 0, nil)
341 ponRMgr.UpdateRanges(ctx, ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
342 "", 0, globalPONRMgr)
343 }
344 if AllocIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
345 globalPONRMgr.UpdateRanges(ctx, ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
346 "", 0, nil)
347
348 ponRMgr.UpdateRanges(ctx, ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
349 "", 0, globalPONRMgr)
350 }
351 if GEMPortIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
352 globalPONRMgr.UpdateRanges(ctx, ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
353 "", 0, nil)
354 ponRMgr.UpdateRanges(ctx, ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
355 "", 0, globalPONRMgr)
356 }
357 if FlowIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
358 globalPONRMgr.UpdateRanges(ctx, ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
359 "", 0, nil)
360 ponRMgr.UpdateRanges(ctx, ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
361 "", 0, globalPONRMgr)
362 }
363
364 ponRMgr.UpdateRanges(ctx, ponrmgr.UNI_ID_START_IDX, 0, ponrmgr.UNI_ID_END_IDX /* TODO =OpenOltPlatform.MAX_UNIS_PER_ONU-1*/, 1, "", 0, nil)
365}
366
367// Delete clears used resources for the particular olt device being deleted
368func (RsrcMgr *OpenOltResourceMgr) Delete(ctx context.Context) error {
369 /* TODO
370 def __del__(self):
371 self.log.info("clearing-device-resource-pool")
372 for key, resource_mgr in self.resource_mgrs.iteritems():
373 resource_mgr.clear_device_resource_pool()
374
375 def assert_pon_id_limit(self, pon_intf_id):
376 assert pon_intf_id in self.resource_mgrs
377
378 def assert_onu_id_limit(self, pon_intf_id, onu_id):
379 self.assert_pon_id_limit(pon_intf_id)
380 self.resource_mgrs[pon_intf_id].assert_resource_limits(onu_id, PONResourceManager.ONU_ID)
381
382 @property
383 def max_uni_id_per_onu(self):
384 return 0 #OpenOltPlatform.MAX_UNIS_PER_ONU-1, zero-based indexing Uncomment or override to make default multi-uni
385
386 def assert_uni_id_limit(self, pon_intf_id, onu_id, uni_id):
387 self.assert_onu_id_limit(pon_intf_id, onu_id)
388 self.resource_mgrs[pon_intf_id].assert_resource_limits(uni_id, PONResourceManager.UNI_ID)
389 */
390 for _, rsrcMgr := range RsrcMgr.ResourceMgrs {
391 if err := rsrcMgr.ClearDeviceResourcePool(ctx); err != nil {
392 logger.Debug(ctx, "Failed to clear device resource pool")
393 return err
394 }
395 }
396 logger.Debug(ctx, "Cleared device resource pool")
397 return nil
398}
399
400// GetONUID returns the available OnuID for the given pon-port
401func (RsrcMgr *OpenOltResourceMgr) GetONUID(ctx context.Context, ponIntfID uint32) (uint32, error) {
402 RsrcMgr.OnuIDMgmtLock[ponIntfID].Lock()
403 defer RsrcMgr.OnuIDMgmtLock[ponIntfID].Unlock()
404
405 if _, ok := RsrcMgr.ResourceMgrs[ponIntfID]; !ok {
406 err := errors.New("invalid-pon-interface-" + strconv.Itoa(int(ponIntfID)))
407 return 0, err
408 }
409 ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
410 ponrmgr.ONU_ID, 1)
411 if err != nil {
412 logger.Errorf(ctx, "Failed to get resource for interface %d for type %s",
413 ponIntfID, ponrmgr.ONU_ID)
414 return 0, err
415 }
416 if ONUID != nil {
417 RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(ctx, fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
418 return ONUID[0], err
419 }
420
421 return 0, err // return OnuID 0 on error
422}
423
424// GetFlowIDInfo returns the slice of flow info of the given pon-port
425// Note: For flows which trap from the NNI and not really associated with any particular
426// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
427func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint32) *[]FlowInfo {
428 var flows []FlowInfo
429
430 FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
431 if err := RsrcMgr.ResourceMgrs[ponIntfID].GetFlowIDInfo(ctx, FlowPath, flowID, &flows); err != nil {
432 logger.Errorw(ctx, "Error while getting flows from KV store", log.Fields{"flowId": flowID})
433 return nil
434 }
435 if len(flows) == 0 {
436 logger.Debugw(ctx, "No flowInfo found in KV store", log.Fields{"flowPath": FlowPath})
437 return nil
438 }
439 return &flows
440}
441
442// GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
443// Note: For flows which trap from the NNI and not really associated with any particular
444// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
445func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, PONIntfID uint32, ONUID int32, UNIID int32) []uint32 {
446
447 FlowPath := fmt.Sprintf("%d,%d,%d", PONIntfID, ONUID, UNIID)
448 if mgrs, exist := RsrcMgr.ResourceMgrs[PONIntfID]; exist {
449 return mgrs.GetCurrentFlowIDsForOnu(ctx, FlowPath)
450 }
451 return nil
452}
453
454// UpdateFlowIDInfo updates flow info for the given pon interface, onu id, and uni id
455// Note: For flows which trap from the NNI and not really associated with any particular
456// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
457func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDInfo(ctx context.Context, ponIntfID int32, onuID int32, uniID int32,
458 flowID uint32, flowData *[]FlowInfo) error {
459 FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
460 return RsrcMgr.ResourceMgrs[uint32(ponIntfID)].UpdateFlowIDInfoForOnu(ctx, FlowPath, flowID, *flowData)
461}
462
463// GetFlowID return flow ID for a given pon interface id, onu id and uni id
464func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ctx context.Context, ponIntfID uint32, ONUID int32, uniID int32,
465 gemportID uint32,
466 flowStoreCookie uint64,
467 flowCategory string, vlanVid uint32, vlanPcp ...uint32) (uint32, error) {
468
469 var err error
470 FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, ONUID, uniID)
471
472 RsrcMgr.FlowIDMgmtLock.Lock()
473 defer RsrcMgr.FlowIDMgmtLock.Unlock()
474
475 FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(ctx, FlowPath)
476 if FlowIDs != nil {
477 logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "ONUID": ONUID, "uniID": uniID, "KVpath": FlowPath})
478 for _, flowID := range FlowIDs {
479 FlowInfo := RsrcMgr.GetFlowIDInfo(ctx, ponIntfID, int32(ONUID), int32(uniID), uint32(flowID))
480 er := getFlowIDFromFlowInfo(ctx, FlowInfo, flowID, gemportID, flowStoreCookie, flowCategory, vlanVid, vlanPcp...)
481 if er == nil {
482 logger.Debugw(ctx, "Found flowid for the vlan, pcp, and gem",
483 log.Fields{"flowID": flowID, "vlanVid": vlanVid, "vlanPcp": vlanPcp, "gemPortID": gemportID})
484 return flowID, er
485 }
486 }
487 }
488 logger.Debug(ctx, "No matching flows with flow cookie or flow category, allocating new flowid")
489 FlowIDs, err = RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
490 ponrmgr.FLOW_ID, 1)
491 if err != nil {
492 logger.Errorf(ctx, "Failed to get resource for interface %d for type %s",
493 ponIntfID, ponrmgr.FLOW_ID)
494 return uint32(0), err
495 }
496 if FlowIDs != nil {
497 _ = RsrcMgr.ResourceMgrs[ponIntfID].UpdateFlowIDForOnu(ctx, FlowPath, FlowIDs[0], true)
498 return FlowIDs[0], err
499 }
500
501 return 0, err
502}
503
504// GetAllocID return the first Alloc ID for a given pon interface id and onu id and then update the resource map on
505// the KV store with the list of alloc_ids allocated for the pon_intf_onu_id tuple
506// Currently of all the alloc_ids available, it returns the first alloc_id in the list for tha given ONU
507func (RsrcMgr *OpenOltResourceMgr) GetAllocID(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) uint32 {
508
509 var err error
510 IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
511
512 RsrcMgr.AllocIDMgmtLock[intfID].Lock()
513 defer RsrcMgr.AllocIDMgmtLock[intfID].Unlock()
514
515 AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
516 if AllocID != nil {
517 // Since we support only one alloc_id for the ONU at the moment,
518 // return the first alloc_id in the list, if available, for that
519 // ONU.
520 logger.Debugw(ctx, "Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
521 return AllocID[0]
522 }
523 AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(ctx, intfID,
524 ponrmgr.ALLOC_ID, 1)
525
526 if AllocID == nil || err != nil {
527 logger.Error(ctx, "Failed to allocate alloc id")
528 return 0
529 }
530 err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(ctx, IntfOnuIDUniID, AllocID)
531 if err != nil {
532 logger.Error(ctx, "Failed to update Alloc ID")
533 return 0
534 }
535 logger.Debugw(ctx, "Allocated new Tcont from pon resource mgr", log.Fields{"AllocID": AllocID})
536 return AllocID[0]
537}
538
539// UpdateAllocIdsForOnu updates alloc ids in kv store for a given pon interface id, onu id and uni id
540func (RsrcMgr *OpenOltResourceMgr) UpdateAllocIdsForOnu(ctx context.Context, ponPort uint32, onuID uint32, uniID uint32, allocID []uint32) error {
541
542 IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
543 return RsrcMgr.ResourceMgrs[ponPort].UpdateAllocIdsForOnu(ctx, IntfOnuIDUniID,
544 allocID)
545}
546
547// GetCurrentGEMPortIDsForOnu returns gem ports for given pon interface , onu id and uni id
548func (RsrcMgr *OpenOltResourceMgr) GetCurrentGEMPortIDsForOnu(ctx context.Context, intfID uint32, onuID uint32,
549 uniID uint32) []uint32 {
550
551 /* Get gem ports for given pon interface , onu id and uni id. */
552
553 IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
554 return RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
555}
556
557// GetCurrentAllocIDsForOnu returns alloc ids for given pon interface and onu id
558func (RsrcMgr *OpenOltResourceMgr) GetCurrentAllocIDsForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) []uint32 {
559
560 IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
561 AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
562 if AllocID != nil {
563 return AllocID
564 }
565 return []uint32{}
566}
567
568// RemoveAllocIDForOnu removes the alloc id for given pon interface, onu id, uni id and alloc id
569func (RsrcMgr *OpenOltResourceMgr) RemoveAllocIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, allocID uint32) {
570 allocIDs := RsrcMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
571 for i := 0; i < len(allocIDs); i++ {
572 if allocIDs[i] == allocID {
573 allocIDs = append(allocIDs[:i], allocIDs[i+1:]...)
574 break
575 }
576 }
577 err := RsrcMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocIDs)
578 if err != nil {
579 logger.Errorf(ctx, "Failed to Remove Alloc Id For Onu. IntfID %d onuID %d uniID %d allocID %d",
580 intfID, onuID, uniID, allocID)
581 }
582}
583
584// RemoveGemPortIDForOnu removes the gem port id for given pon interface, onu id, uni id and gem port id
585func (RsrcMgr *OpenOltResourceMgr) RemoveGemPortIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortID uint32) {
586 gemPortIDs := RsrcMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
587 for i := 0; i < len(gemPortIDs); i++ {
588 if gemPortIDs[i] == gemPortID {
589 gemPortIDs = append(gemPortIDs[:i], gemPortIDs[i+1:]...)
590 break
591 }
592 }
593 err := RsrcMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs)
594 if err != nil {
595 logger.Errorf(ctx, "Failed to Remove Gem Id For Onu. IntfID %d onuID %d uniID %d gemPortId %d",
596 intfID, onuID, uniID, gemPortID)
597 }
598}
599
600// UpdateGEMportsPonportToOnuMapOnKVStore updates onu and uni id associated with the gem port to the kv store
601// This stored information is used when packet_indication is received and we need to derive the ONU Id for which
602// the packet arrived based on the pon_intf and gemport available in the packet_indication
603func (RsrcMgr *OpenOltResourceMgr) UpdateGEMportsPonportToOnuMapOnKVStore(ctx context.Context, gemPorts []uint32, PonPort uint32,
604 onuID uint32, uniID uint32) error {
605
606 /* Update onu and uni id associated with the gem port to the kv store. */
607 var IntfGEMPortPath string
608 Data := fmt.Sprintf("%d %d", onuID, uniID)
609 for _, GEM := range gemPorts {
610 IntfGEMPortPath = fmt.Sprintf("%d,%d", PonPort, GEM)
611 Val, err := json.Marshal(Data)
612 if err != nil {
613 logger.Error(ctx, "failed to Marshal")
614 return err
615 }
616
617 if err = RsrcMgr.KVStore.Put(ctx, IntfGEMPortPath, Val); err != nil {
618 logger.Errorf(ctx, "Failed to update resource %s", IntfGEMPortPath)
619 return err
620 }
621 }
622 return nil
623}
624
625// RemoveGEMportPonportToOnuMapOnKVStore removes the relationship between the gem port and pon port
626func (RsrcMgr *OpenOltResourceMgr) RemoveGEMportPonportToOnuMapOnKVStore(ctx context.Context, GemPort uint32, PonPort uint32) {
627 IntfGEMPortPath := fmt.Sprintf("%d,%d", PonPort, GemPort)
628 err := RsrcMgr.KVStore.Delete(ctx, IntfGEMPortPath)
629 if err != nil {
630 logger.Errorf(ctx, "Failed to Remove Gem port-Pon port to onu map on kv store. Gem %d PonPort %d", GemPort, PonPort)
631 }
632}
633
634// GetGEMPortID gets gem port id for a particular pon port, onu id and uni id and then update the resource map on
635// the KV store with the list of gemport_id allocated for the pon_intf_onu_id tuple
636func (RsrcMgr *OpenOltResourceMgr) GetGEMPortID(ctx context.Context, ponPort uint32, onuID uint32,
637 uniID uint32, NumOfPorts uint32) ([]uint32, error) {
638
639 /* Get gem port id for a particular pon port, onu id
640 and uni id.
641 */
642
643 var err error
644 IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
645
646 RsrcMgr.GemPortIDMgmtLock[ponPort].Lock()
647 defer RsrcMgr.GemPortIDMgmtLock[ponPort].Unlock()
648
649 GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
650 if GEMPortList != nil {
651 return GEMPortList, nil
652 }
653
654 GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ctx, ponPort,
655 ponrmgr.GEMPORT_ID, NumOfPorts)
656 if err != nil && GEMPortList == nil {
657 logger.Errorf(ctx, "Failed to get gem port id for %s", IntfOnuIDUniID)
658 return nil, err
659 }
660
661 err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(ctx, IntfOnuIDUniID,
662 GEMPortList)
663 if err != nil {
664 logger.Errorf(ctx, "Failed to update GEM ports to kv store for %s", IntfOnuIDUniID)
665 return nil, err
666 }
667 _ = RsrcMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, GEMPortList, ponPort,
668 onuID, uniID)
669 return GEMPortList, err
670}
671
672// UpdateGEMPortIDsForOnu updates gemport ids on to the kv store for a given pon port, onu id and uni id
673func (RsrcMgr *OpenOltResourceMgr) UpdateGEMPortIDsForOnu(ctx context.Context, ponPort uint32, onuID uint32,
674 uniID uint32, GEMPortList []uint32) error {
675 IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
676 return RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(ctx, IntfOnuIDUniID,
677 GEMPortList)
678
679}
680
681// FreeonuID releases(make free) onu id for a particular pon-port
682func (RsrcMgr *OpenOltResourceMgr) FreeonuID(ctx context.Context, intfID uint32, onuID []uint32) {
683
684 RsrcMgr.OnuIDMgmtLock[intfID].Lock()
685 defer RsrcMgr.OnuIDMgmtLock[intfID].Unlock()
686
687 RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID, ponrmgr.ONU_ID, onuID)
688
689 /* Free onu id for a particular interface.*/
690 var IntfonuID string
691 for _, onu := range onuID {
692 IntfonuID = fmt.Sprintf("%d,%d", intfID, onu)
693 RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfonuID)
694 }
695}
696
697// FreeFlowID returns the free flow id for a given interface, onu id and uni id
698func (RsrcMgr *OpenOltResourceMgr) FreeFlowID(ctx context.Context, IntfID uint32, onuID int32,
699 uniID int32, FlowID uint32) {
700 var IntfONUID string
701 var err error
702
703 RsrcMgr.FlowIDMgmtLock.Lock()
704 defer RsrcMgr.FlowIDMgmtLock.Unlock()
705
706 FlowIds := make([]uint32, 0)
707 FlowIds = append(FlowIds, FlowID)
708 IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
709 err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(ctx, IntfONUID, FlowID, false)
710 if err != nil {
711 logger.Errorw(ctx, "Failed to Update flow id for", log.Fields{"intf": IntfONUID})
712 }
713 RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfONUID, FlowID)
714
715 RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowIds)
716}
717
718// FreeFlowIDs releases the flow Ids
719func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(ctx context.Context, IntfID uint32, onuID uint32,
720 uniID uint32, FlowID []uint32) {
721 RsrcMgr.FlowIDMgmtLock.Lock()
722 defer RsrcMgr.FlowIDMgmtLock.Unlock()
723
724 RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowID)
725
726 var IntfOnuIDUniID string
727 var err error
728 for _, flow := range FlowID {
729 IntfOnuIDUniID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
730 err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(ctx, IntfOnuIDUniID, flow, false)
731 if err != nil {
732 logger.Errorw(ctx, "Failed to Update flow id for", log.Fields{"intf": IntfOnuIDUniID})
733 }
734 RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfOnuIDUniID, flow)
735 }
736}
737
738// FreeAllocID frees AllocID on the PON resource pool and also frees the allocID association
739// for the given OLT device.
740func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(ctx context.Context, IntfID uint32, onuID uint32,
741 uniID uint32, allocID uint32) {
742 RsrcMgr.AllocIDMgmtLock[IntfID].Lock()
743 defer RsrcMgr.AllocIDMgmtLock[IntfID].Unlock()
744
745 RsrcMgr.RemoveAllocIDForOnu(ctx, IntfID, onuID, uniID, allocID)
746 allocIDs := make([]uint32, 0)
747 allocIDs = append(allocIDs, allocID)
748 RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.ALLOC_ID, allocIDs)
749}
750
751// FreeGemPortID frees GemPortID on the PON resource pool and also frees the gemPortID association
752// for the given OLT device.
753func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(ctx context.Context, IntfID uint32, onuID uint32,
754 uniID uint32, gemPortID uint32) {
755 RsrcMgr.GemPortIDMgmtLock[IntfID].Lock()
756 defer RsrcMgr.GemPortIDMgmtLock[IntfID].Unlock()
757
758 RsrcMgr.RemoveGemPortIDForOnu(ctx, IntfID, onuID, uniID, gemPortID)
759 gemPortIDs := make([]uint32, 0)
760 gemPortIDs = append(gemPortIDs, gemPortID)
761 RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
762}
763
764// FreePONResourcesForONU make the pon resources free for a given pon interface and onu id, and the clears the
765// resource map and the onuID associated with (pon_intf_id, gemport_id) tuple,
766func (RsrcMgr *OpenOltResourceMgr) FreePONResourcesForONU(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
767
768 IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
769
770 RsrcMgr.AllocIDMgmtLock[intfID].Lock()
771 AllocIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
772
773 RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
774 ponrmgr.ALLOC_ID,
775 AllocIDs)
776 RsrcMgr.AllocIDMgmtLock[intfID].Unlock()
777
778 RsrcMgr.GemPortIDMgmtLock[intfID].Lock()
779 GEMPortIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
780 RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
781 ponrmgr.GEMPORT_ID,
782 GEMPortIDs)
783 RsrcMgr.GemPortIDMgmtLock[intfID].Unlock()
784
785 RsrcMgr.FlowIDMgmtLock.Lock()
786 FlowIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentFlowIDsForOnu(ctx, IntfOnuIDUniID)
787 RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
788 ponrmgr.FLOW_ID,
789 FlowIDs)
790 RsrcMgr.FlowIDMgmtLock.Unlock()
791
792 RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
793 for _, GEM := range GEMPortIDs {
794 _ = RsrcMgr.KVStore.Delete(ctx, fmt.Sprintf("%d,%d", intfID, GEM))
795 }
796}
797
798// IsFlowCookieOnKVStore checks if the given flow cookie is present on the kv store
799// Returns true if the flow cookie is found, otherwise it returns false
800func (RsrcMgr *OpenOltResourceMgr) IsFlowCookieOnKVStore(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
801 flowStoreCookie uint64) bool {
802
803 FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
804 FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(ctx, FlowPath)
805 if FlowIDs != nil {
806 logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "onuID": onuID, "uniID": uniID, "KVpath": FlowPath})
807 for _, flowID := range FlowIDs {
808 FlowInfo := RsrcMgr.GetFlowIDInfo(ctx, ponIntfID, int32(onuID), int32(uniID), uint32(flowID))
809 if FlowInfo != nil {
810 logger.Debugw(ctx, "Found flows", log.Fields{"flows": *FlowInfo, "flowId": flowID})
811 for _, Info := range *FlowInfo {
812 if Info.FlowStoreCookie == flowStoreCookie {
813 logger.Debug(ctx, "Found flow matching with flowStore cookie", log.Fields{"flowId": flowID, "flowStoreCookie": flowStoreCookie})
814 return true
815 }
816 }
817 }
818 }
819 }
820 return false
821}
822
823// GetTechProfileIDForOnu fetches Tech-Profile-ID from the KV-Store for the given onu based on the path
824// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
825func (RsrcMgr *OpenOltResourceMgr) GetTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32) []uint32 {
826 Path := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
827 var Data []uint32
828 Value, err := RsrcMgr.KVStore.Get(ctx, Path)
829 if err == nil {
830 if Value != nil {
831 Val, err := kvstore.ToByte(Value.Value)
832 if err != nil {
833 logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"error": err})
834 return Data
835 }
836 if err = json.Unmarshal(Val, &Data); err != nil {
837 logger.Error(ctx, "Failed to unmarshal", log.Fields{"error": err})
838 return Data
839 }
840 }
841 } else {
842 logger.Errorf(ctx, "Failed to get TP id from kvstore for path %s", Path)
843 }
844 logger.Debugf(ctx, "Getting TP id %d from path %s", Data, Path)
845 return Data
846
847}
848
849// RemoveTechProfileIDsForOnu deletes all tech profile ids from the KV-Store for the given onu based on the path
850// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
851func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDsForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32) error {
852 IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
853 if err := RsrcMgr.KVStore.Delete(ctx, IntfOnuUniID); err != nil {
854 logger.Errorw(ctx, "Failed to delete techprofile id resource in KV store", log.Fields{"path": IntfOnuUniID})
855 return err
856 }
857 return nil
858}
859
860// RemoveTechProfileIDForOnu deletes a specific tech profile id from the KV-Store for the given onu based on the path
861// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
862func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32, TpID uint32) error {
863 tpIDList := RsrcMgr.GetTechProfileIDForOnu(ctx, IntfID, OnuID, UniID)
864 for i, tpIDInList := range tpIDList {
865 if tpIDInList == TpID {
866 tpIDList = append(tpIDList[:i], tpIDList[i+1:]...)
867 }
868 }
869 IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
870 Value, err := json.Marshal(tpIDList)
871 if err != nil {
872 logger.Error(ctx, "failed to Marshal")
873 return err
874 }
875 if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
876 logger.Errorf(ctx, "Failed to update resource %s", IntfOnuUniID)
877 return err
878 }
879 return err
880}
881
882// UpdateTechProfileIDForOnu updates (put) already present tech-profile-id for the given onu based on the path
883// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
884func (RsrcMgr *OpenOltResourceMgr) UpdateTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32,
885 UniID uint32, TpID uint32) error {
886 var Value []byte
887 var err error
888
889 IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
890
891 tpIDList := RsrcMgr.GetTechProfileIDForOnu(ctx, IntfID, OnuID, UniID)
892 for _, value := range tpIDList {
893 if value == TpID {
894 logger.Debugf(ctx, "TpID %d is already in tpIdList for the path %s", TpID, IntfOnuUniID)
895 return err
896 }
897 }
898 logger.Debugf(ctx, "updating tp id %d on path %s", TpID, IntfOnuUniID)
899 tpIDList = append(tpIDList, TpID)
900 Value, err = json.Marshal(tpIDList)
901 if err != nil {
902 logger.Error(ctx, "failed to Marshal")
903 return err
904 }
905 if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
906 logger.Errorf(ctx, "Failed to update resource %s", IntfOnuUniID)
907 return err
908 }
909 return err
910}
911
912// UpdateMeterIDForOnu updates the meter id in the KV-Store for the given onu based on the path
913// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
914func (RsrcMgr *OpenOltResourceMgr) UpdateMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
915 UniID uint32, TpID uint32, MeterConfig *ofp.OfpMeterConfig) error {
916 var Value []byte
917 var err error
918
919 IntfOnuUniID := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
920 Value, err = json.Marshal(*MeterConfig)
921 if err != nil {
922 logger.Error(ctx, "failed to Marshal meter config")
923 return err
924 }
925 if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
926 logger.Errorf(ctx, "Failed to store meter into KV store %s", IntfOnuUniID)
927 return err
928 }
929 return err
930}
931
932// GetMeterIDForOnu fetches the meter id from the kv store for the given onu based on the path
933// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
934func (RsrcMgr *OpenOltResourceMgr) GetMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
935 UniID uint32, TpID uint32) (*ofp.OfpMeterConfig, error) {
936 Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
937 var meterConfig ofp.OfpMeterConfig
938 Value, err := RsrcMgr.KVStore.Get(ctx, Path)
939 if err == nil {
940 if Value != nil {
941 logger.Debug(ctx, "Found meter in KV store", log.Fields{"Direction": Direction})
942 Val, er := kvstore.ToByte(Value.Value)
943 if er != nil {
944 logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"error": er})
945 return nil, er
946 }
947 if er = json.Unmarshal(Val, &meterConfig); er != nil {
948 logger.Error(ctx, "Failed to unmarshal meterconfig", log.Fields{"error": er})
949 return nil, er
950 }
951 } else {
952 logger.Debug(ctx, "meter-does-not-exists-in-KVStore")
953 return nil, err
954 }
955 } else {
956 logger.Errorf(ctx, "Failed to get Meter config from kvstore for path %s", Path)
957
958 }
959 return &meterConfig, err
960}
961
962// RemoveMeterIDForOnu deletes the meter id from the kV-Store for the given onu based on the path
963// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
964func (RsrcMgr *OpenOltResourceMgr) RemoveMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
965 UniID uint32, TpID uint32) error {
966 Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
967 if err := RsrcMgr.KVStore.Delete(ctx, Path); err != nil {
968 logger.Errorf(ctx, "Failed to delete meter id %s from kvstore ", Path)
969 return err
970 }
971 return nil
972}
973
974func getFlowIDFromFlowInfo(ctx context.Context, FlowInfo *[]FlowInfo, flowID, gemportID uint32, flowStoreCookie uint64, flowCategory string,
975 vlanVid uint32, vlanPcp ...uint32) error {
976 if FlowInfo != nil {
977 for _, Info := range *FlowInfo {
978 if int32(gemportID) == Info.Flow.GemportId && flowCategory != "" && Info.FlowCategory == flowCategory {
979 logger.Debug(ctx, "Found flow matching with flow category", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
980 if Info.FlowCategory == "HSIA_FLOW" {
981 if err := checkVlanAndPbitEqualityForFlows(vlanVid, Info, vlanPcp[0]); err == nil {
982 return nil
983 }
984 }
985 }
986 if int32(gemportID) == Info.Flow.GemportId && flowStoreCookie != 0 && Info.FlowStoreCookie == flowStoreCookie {
987 if flowCategory != "" && Info.FlowCategory == flowCategory {
988 logger.Debug(ctx, "Found flow matching with flow category", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
989 return nil
990 }
991 }
992 }
993 }
994 logger.Debugw(ctx, "the flow can be related to a different service", log.Fields{"flow_info": FlowInfo})
995 return errors.New("invalid flow-info")
996}
997
998func checkVlanAndPbitEqualityForFlows(vlanVid uint32, Info FlowInfo, vlanPcp uint32) error {
999 if err := checkVlanEqualityForFlows(vlanVid, Info); err != nil {
1000 return err
1001 }
1002
1003 if Info.Flow.Action.Cmd.RemarkInnerPbits || Info.Flow.Action.Cmd.RemarkOuterPbits {
1004 if vlanPcp == Info.Flow.Action.OPbits || vlanPcp == Info.Flow.Action.IPbits {
1005 return nil
1006 }
1007 } else if vlanPcp == Info.Flow.Classifier.OPbits {
1008 //no remark action but flow has pbits
1009 return nil
1010 } else if vlanPcp == 0xff || Info.Flow.Classifier.OPbits == 0xff {
1011 // no pbit found
1012 return nil
1013 }
1014 return errors.New("not found in terms of pbit equality")
1015}
1016
1017func checkVlanEqualityForFlows(vlanVid uint32, Info FlowInfo) error {
1018 if vlanVid == Info.Flow.Action.OVid || vlanVid == Info.Flow.Classifier.IVid {
1019 return nil
1020 }
1021 return errors.New("not found in terms of vlan_id equality")
1022}
1023
1024//AddGemToOnuGemInfo adds gemport to onugem info kvstore
1025func (RsrcMgr *OpenOltResourceMgr) AddGemToOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) error {
1026 var onuGemData []OnuGemInfo
1027 var err error
1028
1029 if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
1030 logger.Errorf(ctx, "failed to get onuifo for intfid %d", intfID)
1031 return err
1032 }
1033 if len(onuGemData) == 0 {
1034 logger.Errorw(ctx, "failed to ger Onuid info ", log.Fields{"intfid": intfID, "onuid": onuID})
1035 return err
1036 }
1037
1038 for idx, onugem := range onuGemData {
1039 if onugem.OnuID == onuID {
1040 for _, gem := range onuGemData[idx].GemPorts {
1041 if gem == gemPort {
1042 logger.Debugw(ctx, "Gem already present in onugem info, skpping addition", log.Fields{"gem": gem})
1043 return nil
1044 }
1045 }
1046 logger.Debugw(ctx, "Added gem to onugem info", log.Fields{"gem": gemPort})
1047 onuGemData[idx].GemPorts = append(onuGemData[idx].GemPorts, gemPort)
1048 break
1049 }
1050 }
1051 err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(ctx, intfID, onuGemData)
1052 if err != nil {
1053 logger.Error(ctx, "Failed to add onugem to kv store")
1054 return err
1055 }
1056 return err
1057}
1058
1059//GetOnuGemInfo gets onu gem info from the kvstore per interface
1060func (RsrcMgr *OpenOltResourceMgr) GetOnuGemInfo(ctx context.Context, IntfID uint32) ([]OnuGemInfo, error) {
1061 var onuGemData []OnuGemInfo
1062
1063 if err := RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(ctx, IntfID, &onuGemData); err != nil {
1064 logger.Errorf(ctx, "failed to get onuifo for intfid %d", IntfID)
1065 return nil, err
1066 }
1067
1068 return onuGemData, nil
1069}
1070
1071// AddOnuGemInfo adds onu info on to the kvstore per interface
1072func (RsrcMgr *OpenOltResourceMgr) AddOnuGemInfo(ctx context.Context, IntfID uint32, onuGem OnuGemInfo) error {
1073 var onuGemData []OnuGemInfo
1074 var err error
1075
1076 if err = RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(ctx, IntfID, &onuGemData); err != nil {
1077 logger.Errorf(ctx, "failed to get onuifo for intfid %d", IntfID)
1078 return olterrors.NewErrPersistence("get", "OnuGemInfo", IntfID,
1079 log.Fields{"onuGem": onuGem, "intfID": IntfID}, err)
1080 }
1081 onuGemData = append(onuGemData, onuGem)
1082 err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(ctx, IntfID, onuGemData)
1083 if err != nil {
1084 logger.Error(ctx, "Failed to add onugem to kv store")
1085 return olterrors.NewErrPersistence("set", "OnuGemInfo", IntfID,
1086 log.Fields{"onuGemData": onuGemData, "intfID": IntfID}, err)
1087 }
1088
1089 logger.Debugw(ctx, "added onu to onugeminfo", log.Fields{"intf": IntfID, "onugem": onuGem})
1090 return nil
1091}
1092
1093// AddUniPortToOnuInfo adds uni port to the onuinfo kvstore. check if the uni is already present if not update the kv store.
1094func (RsrcMgr *OpenOltResourceMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNo uint32) {
1095 var onuGemData []OnuGemInfo
1096 var err error
1097
1098 if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
1099 logger.Errorf(ctx, "failed to get onuifo for intfid %d", intfID)
1100 return
1101 }
1102 for idx, onu := range onuGemData {
1103 if onu.OnuID == onuID {
1104 for _, uni := range onu.UniPorts {
1105 if uni == portNo {
1106 logger.Debugw(ctx, "uni already present in onugem info", log.Fields{"uni": portNo})
1107 return
1108 }
1109 }
1110 onuGemData[idx].UniPorts = append(onuGemData[idx].UniPorts, portNo)
1111 break
1112 }
1113 }
1114 err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(ctx, intfID, onuGemData)
1115 if err != nil {
1116 logger.Errorw(ctx, "Failed to add uin port in onugem to kv store", log.Fields{"uni": portNo})
1117 return
1118 }
1119}
1120
1121//UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno, vlan id, priority bit
1122func (RsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
1123
1124 path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort, pktIn.VlanID, pktIn.Priority)
1125 Value, err := json.Marshal(gemPort)
1126 if err != nil {
1127 logger.Error(ctx, "Failed to marshal data")
1128 return
1129 }
1130 if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
1131 logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"path": path, "value": gemPort})
1132 return
1133 }
1134 logger.Debugw(ctx, "added gem packet in successfully", log.Fields{"path": path, "gem": gemPort})
1135}
1136
1137// GetGemPortFromOnuPktIn gets the gem port from onu pkt in path, path being intfid, onuid, portno, vlan id, priority bit
1138func (RsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(ctx context.Context, packetInInfoKey PacketInInfoKey) (uint32, error) {
1139
1140 var Val []byte
1141 var gemPort uint32
1142
1143 path := fmt.Sprintf(OnuPacketINPath, packetInInfoKey.IntfID, packetInInfoKey.OnuID, packetInInfoKey.LogicalPort,
1144 packetInInfoKey.VlanID, packetInInfoKey.Priority)
1145
1146 value, err := RsrcMgr.KVStore.Get(ctx, path)
1147 if err != nil {
1148 logger.Errorw(ctx, "Failed to get from kv store", log.Fields{"path": path})
1149 return uint32(0), err
1150 } else if value == nil {
1151 logger.Debugw(ctx, "No pkt in gem found", log.Fields{"path": path})
1152 return uint32(0), nil
1153 }
1154
1155 if Val, err = kvstore.ToByte(value.Value); err != nil {
1156 logger.Error(ctx, "Failed to convert to byte array")
1157 return uint32(0), err
1158 }
1159 if err = json.Unmarshal(Val, &gemPort); err != nil {
1160 logger.Error(ctx, "Failed to unmarshall")
1161 return uint32(0), err
1162 }
1163 logger.Debugw(ctx, "found packein gemport from path", log.Fields{"path": path, "gem": gemPort})
1164
1165 return gemPort, nil
1166}
1167
1168//DelGemPortPktInOfAllServices deletes the gemports from pkt in path for all services
1169func (RsrcMgr *OpenOltResourceMgr) DelGemPortPktInOfAllServices(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
1170
1171 Path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
1172 logger.Debugf(ctx, "getting flows from the path:%s", Path)
1173 Value, err := RsrcMgr.KVStore.List(ctx, Path)
1174 if err != nil {
1175 logger.Errorf(ctx, "failed to get flows from kvstore for path %s", Path)
1176 return errors.New("failed to get flows from kvstore for path " + Path)
1177 }
1178 logger.Debugf(ctx, "%d flows retrieved from the path:%s", len(Value), Path)
1179
1180 for key := range Value {
1181 if err := RsrcMgr.KVStore.Delete(ctx, key); err != nil {
1182 logger.Errorf(ctx, "Falied to remove resource %s", key)
1183 return err
1184 }
1185 }
1186 return nil
1187}
1188
1189// DelOnuGemInfoForIntf deletes the onugem info from kvstore per interface
1190func (RsrcMgr *OpenOltResourceMgr) DelOnuGemInfoForIntf(ctx context.Context, intfID uint32) error {
1191 if err := RsrcMgr.ResourceMgrs[intfID].DelOnuGemInfoForIntf(ctx, intfID); err != nil {
1192 logger.Errorw(ctx, "failed to delete onu gem info for", log.Fields{"intfid": intfID})
1193 return err
1194 }
1195 return nil
1196}
1197
1198//GetNNIFromKVStore gets NNi intfids from kvstore. path being per device
1199func (RsrcMgr *OpenOltResourceMgr) GetNNIFromKVStore(ctx context.Context) ([]uint32, error) {
1200
1201 var nni []uint32
1202 var Val []byte
1203
1204 path := NnniIntfID
1205 value, err := RsrcMgr.KVStore.Get(ctx, path)
1206 if err != nil {
1207 logger.Error(ctx, "failed to get data from kv store")
1208 return nil, err
1209 }
1210 if value != nil {
1211 if Val, err = kvstore.ToByte(value.Value); err != nil {
1212 logger.Error(ctx, "Failed to convert to byte array")
1213 return nil, err
1214 }
1215 if err = json.Unmarshal(Val, &nni); err != nil {
1216 logger.Error(ctx, "Failed to unmarshall")
1217 return nil, err
1218 }
1219 }
1220 return nni, err
1221}
1222
1223// AddNNIToKVStore adds Nni interfaces to kvstore, path being per device.
1224func (RsrcMgr *OpenOltResourceMgr) AddNNIToKVStore(ctx context.Context, nniIntf uint32) error {
1225 var Value []byte
1226
1227 nni, err := RsrcMgr.GetNNIFromKVStore(ctx)
1228 if err != nil {
1229 logger.Error(ctx, "failed to fetch nni interfaces from kv store")
1230 return err
1231 }
1232
1233 path := NnniIntfID
1234 nni = append(nni, nniIntf)
1235 Value, err = json.Marshal(nni)
1236 if err != nil {
1237 logger.Error(ctx, "Failed to marshal data")
1238 }
1239 if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
1240 logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"path": path, "value": Value})
1241 return err
1242 }
1243 logger.Debugw(ctx, "added nni to kv successfully", log.Fields{"path": path, "nni": nniIntf})
1244 return nil
1245}
1246
1247// DelNNiFromKVStore deletes nni interface list from kv store.
1248func (RsrcMgr *OpenOltResourceMgr) DelNNiFromKVStore(ctx context.Context) error {
1249
1250 path := NnniIntfID
1251
1252 if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
1253 logger.Errorw(ctx, "Failed to delete nni interfaces from kv store", log.Fields{"path": path})
1254 return err
1255 }
1256 return nil
1257}
1258
1259//UpdateFlowIDsForGem updates flow id per gemport
1260func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint32) error {
1261 var val []byte
1262 path := fmt.Sprintf(FlowIDsForGem, intf)
1263
1264 flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(ctx, intf)
1265 if err != nil {
1266 logger.Error(ctx, "Failed to ger flowids for interface", log.Fields{"error": err, "intf": intf})
1267 return err
1268 }
1269 if flowsForGem == nil {
1270 flowsForGem = make(map[uint32][]uint32)
1271 }
1272 flowsForGem[gem] = flowIDs
1273 val, err = json.Marshal(flowsForGem)
1274 if err != nil {
1275 logger.Error(ctx, "Failed to marshal data", log.Fields{"error": err})
1276 return err
1277 }
1278
1279 RsrcMgr.flowIDToGemInfoLock.Lock()
1280 defer RsrcMgr.flowIDToGemInfoLock.Unlock()
1281 if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
1282 logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
1283 return err
1284 }
1285 logger.Debugw(ctx, "added flowid list for gem to kv successfully", log.Fields{"path": path, "flowidlist": flowsForGem[gem]})
1286 return nil
1287}
1288
1289//DeleteFlowIDsForGem deletes the flowID list entry per gem from kvstore.
1290func (RsrcMgr *OpenOltResourceMgr) DeleteFlowIDsForGem(ctx context.Context, intf uint32, gem uint32) {
1291 path := fmt.Sprintf(FlowIDsForGem, intf)
1292 var val []byte
1293
1294 flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(ctx, intf)
1295 if err != nil {
1296 logger.Error(ctx, "Failed to ger flowids for interface", log.Fields{"error": err, "intf": intf})
1297 return
1298 }
1299 if flowsForGem == nil {
1300 logger.Error(ctx, "No flowids found ", log.Fields{"intf": intf, "gemport": gem})
1301 return
1302 }
1303 delete(flowsForGem, gem)
1304 val, err = json.Marshal(flowsForGem)
1305 if err != nil {
1306 logger.Error(ctx, "Failed to marshal data", log.Fields{"error": err})
1307 return
1308 }
1309
1310 RsrcMgr.flowIDToGemInfoLock.Lock()
1311 defer RsrcMgr.flowIDToGemInfoLock.Unlock()
1312 if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
1313 logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
1314 }
1315}
1316
1317//GetFlowIDsGemMapForInterface gets flowids per gemport and interface
1318func (RsrcMgr *OpenOltResourceMgr) GetFlowIDsGemMapForInterface(ctx context.Context, intf uint32) (map[uint32][]uint32, error) {
1319 path := fmt.Sprintf(FlowIDsForGem, intf)
1320 var flowsForGem map[uint32][]uint32
1321 var val []byte
1322 RsrcMgr.flowIDToGemInfoLock.RLock()
1323 value, err := RsrcMgr.KVStore.Get(ctx, path)
1324 RsrcMgr.flowIDToGemInfoLock.RUnlock()
1325 if err != nil {
1326 logger.Error(ctx, "failed to get data from kv store")
1327 return nil, err
1328 }
1329 if value != nil && value.Value != nil {
1330 if val, err = kvstore.ToByte(value.Value); err != nil {
1331 logger.Error(ctx, "Failed to convert to byte array ", log.Fields{"error": err})
1332 return nil, err
1333 }
1334 if err = json.Unmarshal(val, &flowsForGem); err != nil {
1335 logger.Error(ctx, "Failed to unmarshall", log.Fields{"error": err})
1336 return nil, err
1337 }
1338 }
1339 return flowsForGem, nil
1340}
1341
1342//DeleteIntfIDGempMapPath deletes the intf id path used to store flow ids per gem to kvstore.
1343func (RsrcMgr *OpenOltResourceMgr) DeleteIntfIDGempMapPath(ctx context.Context, intf uint32) {
1344 path := fmt.Sprintf(FlowIDsForGem, intf)
1345 RsrcMgr.flowIDToGemInfoLock.Lock()
1346 defer RsrcMgr.flowIDToGemInfoLock.Unlock()
1347 if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
1348 logger.Errorw(ctx, "Failed to delete nni interfaces from kv store", log.Fields{"path": path})
1349 }
1350}
1351
1352// RemoveResourceMap Clear resource map associated with (intfid, onuid, uniid) tuple.
1353func (RsrcMgr *OpenOltResourceMgr) RemoveResourceMap(ctx context.Context, intfID uint32, onuID int32, uniID int32) {
1354 IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
1355 RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
1356}
1357
1358//GetMcastQueuePerInterfaceMap gets multicast queue info per pon interface
1359func (RsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap(ctx context.Context) (map[uint32][]uint32, error) {
1360 path := McastQueuesForIntf
1361 var mcastQueueToIntfMap map[uint32][]uint32
1362 var val []byte
1363
1364 kvPair, err := RsrcMgr.KVStore.Get(ctx, path)
1365 if err != nil {
1366 logger.Error(ctx, "failed to get data from kv store")
1367 return nil, err
1368 }
1369 if kvPair != nil && kvPair.Value != nil {
1370 if val, err = kvstore.ToByte(kvPair.Value); err != nil {
1371 logger.Error(ctx, "Failed to convert to byte array ", log.Fields{"error": err})
1372 return nil, err
1373 }
1374 if err = json.Unmarshal(val, &mcastQueueToIntfMap); err != nil {
1375 logger.Error(ctx, "Failed to unmarshall ", log.Fields{"error": err})
1376 return nil, err
1377 }
1378 }
1379 return mcastQueueToIntfMap, nil
1380}
1381
1382//AddMcastQueueForIntf adds multicast queue for pon interface
1383func (RsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(ctx context.Context, intf uint32, gem uint32, servicePriority uint32) error {
1384 var val []byte
1385 path := McastQueuesForIntf
1386
1387 mcastQueues, err := RsrcMgr.GetMcastQueuePerInterfaceMap(ctx)
1388 if err != nil {
1389 logger.Errorw(ctx, "Failed to get multicast queue info for interface", log.Fields{"error": err, "intf": intf})
1390 return err
1391 }
1392 if mcastQueues == nil {
1393 mcastQueues = make(map[uint32][]uint32)
1394 }
1395 mcastQueues[intf] = []uint32{gem, servicePriority}
1396 if val, err = json.Marshal(mcastQueues); err != nil {
1397 logger.Errorw(ctx, "Failed to marshal data", log.Fields{"error": err})
1398 return err
1399 }
1400 if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
1401 logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
1402 return err
1403 }
1404 logger.Debugw(ctx, "added multicast queue info to KV store successfully", log.Fields{"path": path, "mcastQueueInfo": mcastQueues[intf], "interfaceId": intf})
1405 return nil
1406}
1407
1408//AddFlowGroupToKVStore adds flow group into KV store
1409func (RsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(ctx context.Context, groupEntry *ofp.OfpGroupEntry, cached bool) error {
1410 var Value []byte
1411 var err error
1412 var path string
1413 if cached {
1414 path = fmt.Sprintf(FlowGroupCached, groupEntry.Desc.GroupId)
1415 } else {
1416 path = fmt.Sprintf(FlowGroup, groupEntry.Desc.GroupId)
1417 }
1418 var outPorts []uint32
1419 for _, ofBucket := range groupEntry.Desc.Buckets {
1420 for _, ofAction := range ofBucket.Actions {
1421 if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
1422 outPorts = append(outPorts, ofAction.GetOutput().Port)
1423 }
1424 }
1425 }
1426 groupInfo := GroupInfo{
1427 GroupID: groupEntry.Desc.GroupId,
1428 OutPorts: outPorts,
1429 }
1430
1431 Value, err = json.Marshal(groupInfo)
1432
1433 if err != nil {
1434 logger.Error(ctx, "failed to Marshal flow group object")
1435 return err
1436 }
1437
1438 if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
1439 logger.Errorf(ctx, "Failed to update resource %s", path)
1440 return err
1441 }
1442 return nil
1443}
1444
1445//RemoveFlowGroupFromKVStore removes flow group from KV store
1446func (RsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) error {
1447 var path string
1448 if cached {
1449 path = fmt.Sprintf(FlowGroupCached, groupID)
1450 } else {
1451 path = fmt.Sprintf(FlowGroup, groupID)
1452 }
1453 if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
1454 logger.Errorf(ctx, "Failed to remove resource %s due to %s", path, err)
1455 return err
1456 }
1457 return nil
1458}
1459
1460//GetFlowGroupFromKVStore fetches flow group from the KV store. Returns (false, {} error) if any problem occurs during
1461//fetching the data. Returns (true, groupInfo, nil) if the group is fetched successfully.
1462// Returns (false, {}, nil) if the group does not exists in the KV store.
1463func (RsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (bool, GroupInfo, error) {
1464 var groupInfo GroupInfo
1465 var path string
1466 if cached {
1467 path = fmt.Sprintf(FlowGroupCached, groupID)
1468 } else {
1469 path = fmt.Sprintf(FlowGroup, groupID)
1470 }
1471 kvPair, err := RsrcMgr.KVStore.Get(ctx, path)
1472 if err != nil {
1473 return false, groupInfo, err
1474 }
1475 if kvPair != nil && kvPair.Value != nil {
1476 Val, err := kvstore.ToByte(kvPair.Value)
1477 if err != nil {
1478 logger.Errorw(ctx, "Failed to convert flow group into byte array", log.Fields{"error": err})
1479 return false, groupInfo, err
1480 }
1481 if err = json.Unmarshal(Val, &groupInfo); err != nil {
1482 logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"error": err})
1483 return false, groupInfo, err
1484 }
1485 return true, groupInfo, nil
1486 }
1487 return false, groupInfo, nil
1488}