blob: 449afe82d6c871ace9192d449db8820df90deecf [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package core provides the utility for olt devices, flows and statistics
18package core
19
20import (
21 "context"
22 "crypto/md5"
23 "encoding/binary"
24 "encoding/hex"
25 "encoding/json"
26 "errors"
27 "fmt"
28 "math/big"
29 "strings"
30 "sync"
31 "time"
32
33 "github.com/opencord/voltha-lib-go/v3/pkg/flows"
34 "github.com/opencord/voltha-lib-go/v3/pkg/log"
35 tp "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
36 rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
37 "github.com/opencord/voltha-protos/v3/go/common"
38 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
39 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
40 openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
41 tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
42 "github.com/opencord/voltha-protos/v3/go/voltha"
43
44 "github.com/EagleChen/mapmutex"
45 "github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
46 "google.golang.org/grpc/codes"
47 "google.golang.org/grpc/status"
48)
49
50const (
51
52 HsiaFlow = "HSIA_FLOW"
53
54 EapolFlow = "EAPOL_FLOW"
55
56 DhcpFlow = "DHCP_FLOW"
57
58 MulticastFlow = "MULTICAST_FLOW"
59
60 IgmpFlow = "IGMP_FLOW"
61
62 IPProtoDhcp = 17
63
64 IPProtoIgmp = 2
65
66 EapEthType = 0x888e
67 LldpEthType = 0x88cc
68 IPv4EthType = 0x800
69
70 IgmpProto = 2
71
72 ReservedVlan = 4096
73
74 DefaultMgmtVlan = 4091
75
76
77 Upstream = "upstream"
78 Downstream = "downstream"
79 Multicast = "multicast"
80 PacketTagType = "pkt_tag_type"
81 Untagged = "untagged"
82 SingleTag = "single_tag"
83 DoubleTag = "double_tag"
84
85
86 EthType = "eth_type"
87 EthDst = "eth_dst"
88 TPID = "tpid"
89 IPProto = "ip_proto"
90 InPort = "in_port"
91 VlanVid = "vlan_vid"
92 VlanPcp = "vlan_pcp"
93
94 UDPDst = "udp_dst"
95 UDPSrc = "udp_src"
96 Ipv4Dst = "ipv4_dst"
97 Ipv4Src = "ipv4_src"
98 Metadata = "metadata"
99 TunnelID = "tunnel_id"
100 Output = "output"
101 GroupID = "group_id"
102
103 PopVlan = "pop_vlan"
104 PushVlan = "push_vlan"
105 TrapToHost = "trap_to_host"
106 MaxMeterBand = 2
107 VlanPCPMask = 0xFF
108 VlanvIDMask = 0xFFF
109 IntfID = "intfId"
110 OnuID = "onuId"
111 UniID = "uniId"
112 PortNo = "portNo"
113 AllocID = "allocId"
114
115 NoneOnuID = -1
116 NoneUniID = -1
117 NoneGemPortID = -1
118
119 BinaryStringPrefix = "0b"
120 BinaryBit1 = '1'
121
122 maxRetry = 300
123 maxDelay = 100000000
124 baseDelay = 10000000
125 factor = 1.1
126 jitter = 0.2
127)
128
129type gemPortKey struct {
130 intfID uint32
131 gemPort uint32
132}
133
134type pendingFlowDeleteKey struct {
135 intfID uint32
136 onuID uint32
137 uniID uint32
138}
139
140type tpLockKey struct {
141 intfID uint32
142 onuID uint32
143 uniID uint32
144}
145
146type schedQueue struct {
147 direction tp_pb.Direction
148 intfID uint32
149 onuID uint32
150 uniID uint32
151 tpID uint32
152 uniPort uint32
153 tpInst interface{}
154 meterID uint32
155 flowMetadata *voltha.FlowMetadata
156}
157
158type queueInfoBrief struct {
159 gemPortID uint32
160 servicePriority uint32
161}
162
163//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
164type OpenOltFlowMgr struct {
165 techprofile map[uint32]tp.TechProfileIf
166 deviceHandler *DeviceHandler
167 resourceMgr *rsrcMgr.OpenOltResourceMgr
168 onuIdsLock sync.RWMutex
169 perGemPortLock *mapmutex.Mutex // lock to be used to access the flowsUsedByGemPort map
170 flowsUsedByGemPort map[gemPortKey][]uint32 //gem port id to flow ids
171 packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
172 onuGemInfo map[uint32][]rsrcMgr.OnuGemInfo //onu, gem and uni info local cache, indexed by IntfId
173 onuGemInfoLock sync.RWMutex
174 pendingFlowDelete sync.Map
175 perUserFlowHandleLock *mapmutex.Mutex
176 interfaceToMcastQueueMap map[uint32]*queueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
177}
178
179var autoAddFlow bool = true
180var noticeDownloadRequest bool = false
181var mapCreateScheduler map[string]bool = map[string]bool{}
182var mapAddFlow map[string]bool = map[string]bool{}
183
184func resetFlowMap() {
185 mapCreateScheduler = map[string]bool{}
186 mapAddFlow = map[string]bool{}
187}
188
189//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
190func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
191 logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
192 var flowMgr OpenOltFlowMgr
193 var err error
194 var idx uint32
195
196 flowMgr.deviceHandler = dh
197 flowMgr.resourceMgr = rMgr
198 flowMgr.techprofile = make(map[uint32]tp.TechProfileIf)
199 if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
200 logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"error": err})
201 return nil
202 }
203 flowMgr.onuIdsLock = sync.RWMutex{}
204 flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
205 flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
206 ponPorts := rMgr.DevInfo.GetPonPorts()
207 flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo, ponPorts)
208 for idx = 0; idx < ponPorts; idx++ {
209 if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
210 logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
211 }
212 //Load flowID list per gem map per interface from the kvstore.
213 flowMgr.loadFlowIDlistForGem(ctx, idx)
214 }
215 flowMgr.onuGemInfoLock = sync.RWMutex{}
216 flowMgr.pendingFlowDelete = sync.Map{}
217 flowMgr.perUserFlowHandleLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
218 flowMgr.perGemPortLock = mapmutex.NewCustomizedMapMutex(maxRetry, maxDelay, baseDelay, factor, jitter)
219 flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
220 flowMgr.loadInterfaceToMulticastQueueMap(ctx)
221 logger.Info(ctx, "initialization-of-flow-manager-success")
222 return &flowMgr
223}
224
225func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
226 gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
227 if f.perGemPortLock.TryLock(gemPK) {
228 logger.Debugw(ctx, "registering-flow-for-device ",
229 log.Fields{
230 "flow": flowFromCore,
231 "device-id": f.deviceHandler.device.Id})
232 flowIDList, ok := f.flowsUsedByGemPort[gemPK]
233 if !ok {
234 flowIDList = []uint32{deviceFlow.FlowId}
235 }
236 flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
237 f.flowsUsedByGemPort[gemPK] = flowIDList
238
239 f.perGemPortLock.Unlock(gemPK)
240
241 // update the flowids for a gem to the KVstore
242 return f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
243 }
244 logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
245 log.Fields{
246 "flow-from-core": flowFromCore,
247 "device-id": f.deviceHandler.device.Id,
248 "key": gemPK,
249 })
250 return olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
251 "flow-from-core": flowFromCore,
252 "device-id": f.deviceHandler.device.Id,
253 "key": gemPK,
254 }, nil)
255}
256
257func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
258 classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
259 UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) {
260 var allocID uint32
261 var gemPorts []uint32
262 var TpInst interface{}
263
264 logger.Infow(ctx, "dividing-flow", log.Fields{
265 "device-id": f.deviceHandler.device.Id,
266 "intf-id": intfID,
267 "onu-id": onuID,
268 "uni-id": uniID,
269 "port-no": portNo,
270 "classifier": classifierInfo,
271 "action": actionInfo,
272 "usmeter-iD": UsMeterID,
273 "dsmeter-iD": DsMeterID,
274 "tp-id": TpID})
275 if onuID == 0 {
276 logger.Errorw(ctx, "no-onu-id-for-flow",
277 log.Fields{
278 "port-no": portNo,
279 "classifer": classifierInfo,
280 "action": actionInfo,
281 "device-id": f.deviceHandler.device.Id})
282 return
283 }
284
285 uni := getUniPortPath(f.deviceHandler.device.Id, intfID, int32(onuID), int32(uniID))
286 logger.Debugw(ctx, "uni-port-path", log.Fields{
287 "uni": uni,
288 "device-id": f.deviceHandler.device.Id})
289
290 tpLockMapKey := tpLockKey{intfID, onuID, uniID}
291 if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
292 logger.Debugw(ctx, "dividing-flow-create-tcont-gem-ports", log.Fields{
293 "device-id": f.deviceHandler.device.Id,
294 "intf-id": intfID,
295 "onu-id": onuID,
296 "uni-id": uniID,
297 "port-no": portNo,
298 "classifier": classifierInfo,
299 "action": actionInfo,
300 "usmeter-id": UsMeterID,
301 "dsmeter-id": DsMeterID,
302 "tp-id": TpID})
303 allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
304 if allocID == 0 || gemPorts == nil || TpInst == nil {
305 logger.Error(ctx, "alloc-id-gem-ports-tp-unavailable")
306 f.perUserFlowHandleLock.Unlock(tpLockMapKey)
307 return
308 }
309 args := make(map[string]uint32)
310 args[IntfID] = intfID
311 args[OnuID] = onuID
312 args[UniID] = uniID
313 args[PortNo] = portNo
314 args[AllocID] = allocID
315
316 /* Flows can be added specific to gemport if p-bits are received.
317 * If no pbit mentioned then adding flows for all gemports
318 */
319 f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
320 f.perUserFlowHandleLock.Unlock(tpLockMapKey)
321 } else {
322 logger.Errorw(ctx, "failed-to-acquire-per-user-flow-handle-lock",
323 log.Fields{
324 "intf-id": intfID,
325 "onu-id": onuID,
326 "uni-id": uniID,
327 "flow-id": flow.Id,
328 "flow-cookie": flow.Cookie,
329 "device-id": f.deviceHandler.device.Id})
330 return
331 }
332}
333
334// CreateSchedulerQueues creates traffic schedulers on the device with the given scheduler configuration and traffic shaping info
335func (f *OpenOltFlowMgr) CreateSchedulerQueues(ctx context.Context, sq schedQueue) error {
336
337 logger.Debugw(ctx, "CreateSchedulerQueues",
338 log.Fields{"dir": sq.direction,
339 "intf-id": sq.intfID,
340 "onu-id": sq.onuID,
341 "uni-id": sq.uniID,
342 "tp-id": sq.tpID,
343 "meter-id": sq.meterID,
344 "tp-inst": sq.tpInst,
345 "flowmetadata": sq.flowMetadata,
346 "device-id": f.deviceHandler.device.Id})
347
348 Direction, err := verifyMeterIDAndGetDirection(sq.meterID, sq.direction)
349 if err != nil {
350 return err
351 }
352
353 /* Lets make a simple assumption that if the meter-id is present on the KV store,
354 * then the scheduler and queues configuration is applied on the OLT device
355 * in the given direction.
356 */
357
358 var SchedCfg *tp_pb.SchedulerConfig
359 KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
360 if err != nil {
361 return olterrors.NewErrNotFound("meter",
362 log.Fields{"intf-id": sq.intfID,
363 "onu-id": sq.onuID,
364 "uni-id": sq.uniID,
365 "device-id": f.deviceHandler.device.Id}, err)
366 }
367
368 if KvStoreMeter != nil {
369 if KvStoreMeter.MeterId == sq.meterID {
370 logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id})
371 return nil
372 }
373 return olterrors.NewErrInvalidValue(log.Fields{
374 "unsupported": "meter-id",
375 "kv-store-meter-id": KvStoreMeter.MeterId,
376 "meter-id-in-flow": sq.meterID,
377 "device-id": f.deviceHandler.device.Id}, nil)
378 }
379
380 logger.Debugw(ctx, "meter-does-not-exist-creating-new",
381 log.Fields{
382 "meter-id": sq.meterID,
383 "direction": Direction,
384 "device-id": f.deviceHandler.device.Id})
385
386 if sq.direction == tp_pb.Direction_UPSTREAM {
387 SchedCfg, err = f.techprofile[sq.intfID].GetUsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
388 } else if sq.direction == tp_pb.Direction_DOWNSTREAM {
389 SchedCfg, err = f.techprofile[sq.intfID].GetDsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
390 }
391
392 if err != nil {
393 return olterrors.NewErrNotFound("scheduler-config",
394 log.Fields{
395 "intf-id": sq.intfID,
396 "direction": sq.direction,
397 "tp-inst": sq.tpInst,
398 "device-id": f.deviceHandler.device.Id}, err)
399 }
400
401 var meterConfig *ofp.OfpMeterConfig
402 if sq.flowMetadata != nil {
403 for _, meter := range sq.flowMetadata.Meters {
404 if sq.meterID == meter.MeterId {
405 meterConfig = meter
406 logger.Debugw(ctx, "found-meter-config-from-flowmetadata",
407 log.Fields{"meterConfig": meterConfig,
408 "device-id": f.deviceHandler.device.Id})
409 break
410 }
411 }
412 } else {
413 logger.Errorw(ctx, "flow-metadata-not-present-in-flow", log.Fields{"device-id": f.deviceHandler.device.Id})
414 }
415 if meterConfig == nil {
416 return olterrors.NewErrNotFound("meterbands", log.Fields{
417 "reason": "Could-not-get-meterbands-from-flowMetadata",
418 "flow-metadata": sq.flowMetadata,
419 "meter-id": sq.meterID,
420 "device-id": f.deviceHandler.device.Id}, nil)
421 } else if len(meterConfig.Bands) < MaxMeterBand {
422 logger.Errorw(ctx, "invalid-number-of-bands-in-meter",
423 log.Fields{"Bands": meterConfig.Bands,
424 "meter-id": sq.meterID,
425 "device-id": f.deviceHandler.device.Id})
426 return olterrors.NewErrInvalidValue(log.Fields{
427 "reason": "Invalid-number-of-bands-in-meter",
428 "meterband-count": len(meterConfig.Bands),
429 "metabands": meterConfig.Bands,
430 "meter-id": sq.meterID,
431 "device-id": f.deviceHandler.device.Id}, nil)
432 }
433 cir := meterConfig.Bands[0].Rate
434 cbs := meterConfig.Bands[0].BurstSize
435 eir := meterConfig.Bands[1].Rate
436 ebs := meterConfig.Bands[1].BurstSize
437 pir := cir + eir
438 pbs := cbs + ebs
439 TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
440
441 TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
442 TrafficSched[0].TechProfileId = sq.tpID
443
444 if err := f.pushSchedulerQueuesToDevice(ctx, sq, TrafficShaping, TrafficSched); err != nil {
445 return olterrors.NewErrAdapter("failure-pushing-traffic-scheduler-and-queues-to-device",
446 log.Fields{"intf-id": sq.intfID,
447 "direction": sq.direction,
448 "device-id": f.deviceHandler.device.Id}, err)
449 }
450
451 /* After we successfully applied the scheduler configuration on the OLT device,
452 * store the meter id on the KV store, for further reference.
453 */
454 if err := f.resourceMgr.UpdateMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterConfig); err != nil {
455 return olterrors.NewErrAdapter("failed-updating-meter-id",
456 log.Fields{"onu-id": sq.onuID,
457 "meter-id": sq.meterID,
458 "device-id": f.deviceHandler.device.Id}, err)
459 }
460 logger.Infow(ctx, "updated-meter-info-into-kv-store-successfully",
461 log.Fields{"direction": Direction,
462 "Meter": meterConfig,
463 "device-id": f.deviceHandler.device.Id})
464
465 return nil
466}
467
468func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(ctx context.Context, sq schedQueue, TrafficShaping *tp_pb.TrafficShapingInfo, TrafficSched []*tp_pb.TrafficScheduler) error {
469 trafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
470
471 if err != nil {
472 return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
473 log.Fields{"intf-id": sq.intfID,
474 "direction": sq.direction,
475 "device-id": f.deviceHandler.device.Id}, err)
476 }
477
478 logger.Debugw(ctx, "sending-traffic-scheduler-create-to-device",
479 log.Fields{
480 "direction": sq.direction,
481 "TrafficScheds": TrafficSched,
482 "device-id": f.deviceHandler.device.Id})
483 if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
484 IntfId: sq.intfID, OnuId: sq.onuID,
485 UniId: sq.uniID, PortNo: sq.uniPort,
486 TrafficScheds: TrafficSched}); err != nil {
487 return olterrors.NewErrAdapter("failed-to-create-traffic-schedulers-in-device", log.Fields{"TrafficScheds": TrafficSched}, err)
488 }
489 logger.Infow(ctx, "successfully-created-traffic-schedulers", log.Fields{
490 "direction": sq.direction,
491 "traffic-queues": trafficQueues,
492 "device-id": f.deviceHandler.device.Id})
493
494 logger.Debugw(ctx, "sending-traffic-queues-create-to-device",
495 log.Fields{"direction": sq.direction,
496 "traffic-queues": trafficQueues,
497 "device-id": f.deviceHandler.device.Id})
498 if _, err := f.deviceHandler.Client.CreateTrafficQueues(ctx,
499 &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
500 UniId: sq.uniID, PortNo: sq.uniPort,
501 TrafficQueues: trafficQueues,
502 TechProfileId: TrafficSched[0].TechProfileId}); err != nil {
503 return olterrors.NewErrAdapter("failed-to-create-traffic-queues-in-device", log.Fields{"traffic-queues": trafficQueues}, err)
504 }
505 logger.Infow(ctx, "successfully-created-traffic-schedulers", log.Fields{
506 "direction": sq.direction,
507 "traffic-queues": trafficQueues,
508 "device-id": f.deviceHandler.device.Id})
509
510 if sq.direction == tp_pb.Direction_DOWNSTREAM {
511 multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile))
512 if len(multicastTrafficQueues) > 0 {
513 if _, present := f.interfaceToMcastQueueMap[sq.intfID]; !present {
514 //assumed that there is only one queue per PON for the multicast service
515 //the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
516 //just put it in interfaceToMcastQueueMap to use for building group members
517 logger.Debugw(ctx, "multicast-traffic-queues", log.Fields{"device-id": f.deviceHandler.device.Id})
518 multicastQueuePerPonPort := multicastTrafficQueues[0]
519 f.interfaceToMcastQueueMap[sq.intfID] = &queueInfoBrief{
520 gemPortID: multicastQueuePerPonPort.GemportId,
521 servicePriority: multicastQueuePerPonPort.Priority,
522 }
523 //also store the queue info in kv store
524 if err := f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID, multicastQueuePerPonPort.GemportId, multicastQueuePerPonPort.Priority); err != nil {
525 logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"error": err})
526 return err
527 }
528
529 logger.Infow(ctx, "multicast-queues-successfully-updated", log.Fields{"device-id": f.deviceHandler.device.Id})
530 }
531 }
532 }
533 return nil
534}
535
536// RemoveSchedulerQueues removes the traffic schedulers from the device based on the given scheduler configuration and traffic shaping info
537func (f *OpenOltFlowMgr) RemoveSchedulerQueues(ctx context.Context, sq schedQueue) error {
538
539 var Direction string
540 var SchedCfg *tp_pb.SchedulerConfig
541 var err error
542 logger.Infow(ctx, "removing-schedulers-and-queues-in-olt",
543 log.Fields{
544 "direction": sq.direction,
545 "intf-id": sq.intfID,
546 "onu-id": sq.onuID,
547 "uni-id": sq.uniID,
548 "uni-port": sq.uniPort,
549 "device-id": f.deviceHandler.device.Id})
550 if sq.direction == tp_pb.Direction_UPSTREAM {
551 SchedCfg, err = f.techprofile[sq.intfID].GetUsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
552 Direction = "upstream"
553 } else if sq.direction == tp_pb.Direction_DOWNSTREAM {
554 SchedCfg, err = f.techprofile[sq.intfID].GetDsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
555 Direction = "downstream"
556 }
557
558 if err != nil {
559 return olterrors.NewErrNotFound("scheduler-config",
560 log.Fields{
561 "int-id": sq.intfID,
562 "direction": sq.direction,
563 "device-id": f.deviceHandler.device.Id}, err)
564 }
565
566 KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
567 if err != nil {
568 return olterrors.NewErrNotFound("meter",
569 log.Fields{
570 "onu-id": sq.onuID,
571 "device-id": f.deviceHandler.device.Id}, err)
572 }
573 if KVStoreMeter == nil {
574 logger.Warnw(ctx, "no-meter-installed-yet",
575 log.Fields{
576 "direction": Direction,
577 "intf-id": sq.intfID,
578 "onu-id": sq.onuID,
579 "uni-id": sq.uniID,
580 "device-id": f.deviceHandler.device.Id})
581 return nil
582 }
583 cir := KVStoreMeter.Bands[0].Rate
584 cbs := KVStoreMeter.Bands[0].BurstSize
585 eir := KVStoreMeter.Bands[1].Rate
586 ebs := KVStoreMeter.Bands[1].BurstSize
587 pir := cir + eir
588 pbs := cbs + ebs
589
590 TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
591
592 TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
593 TrafficSched[0].TechProfileId = sq.tpID
594
595 TrafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
596 if err != nil {
597 return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
598 log.Fields{
599 "intf-id": sq.intfID,
600 "direction": sq.direction,
601 "device-id": f.deviceHandler.device.Id}, err)
602 }
603
604 if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
605 &tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
606 UniId: sq.uniID, PortNo: sq.uniPort,
607 TrafficQueues: TrafficQueues,
608 TechProfileId: TrafficSched[0].TechProfileId}); err != nil {
609 return olterrors.NewErrAdapter("unable-to-remove-traffic-queues-from-device",
610 log.Fields{
611 "intf-id": sq.intfID,
612 "traffic-queues": TrafficQueues,
613 "device-id": f.deviceHandler.device.Id}, err)
614 }
615 logger.Infow(ctx, "removed-traffic-queues-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
616 if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
617 IntfId: sq.intfID, OnuId: sq.onuID,
618 UniId: sq.uniID, PortNo: sq.uniPort,
619 TrafficScheds: TrafficSched}); err != nil {
620 return olterrors.NewErrAdapter("unable-to-remove-traffic-schedulers-from-device",
621 log.Fields{
622 "intf-id": sq.intfID,
623 "traffic-schedulers": TrafficSched}, err)
624 }
625
626 logger.Infow(ctx, "removed-traffic-schedulers-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
627
628 /* After we successfully remove the scheduler configuration on the OLT device,
629 * delete the meter id on the KV store.
630 */
631 err = f.resourceMgr.RemoveMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
632 if err != nil {
633 return olterrors.NewErrAdapter("unable-to-remove-meter",
634 log.Fields{
635 "onu": sq.onuID,
636 "meter": KVStoreMeter.MeterId,
637 "device-id": f.deviceHandler.device.Id}, err)
638 }
639 logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
640 log.Fields{
641 "meter-id": KVStoreMeter.MeterId,
642 "dir": Direction,
643 "device-id": f.deviceHandler.device.Id})
644 return err
645}
646
647// This function allocates tconts and GEM ports for an ONU
648func (f *OpenOltFlowMgr) createTcontGemports(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) (uint32, []uint32, interface{}) {
649 var allocIDs []uint32
650 var allgemPortIDs []uint32
651 var gemPortIDs []uint32
652 tpInstanceExists := false
653 var err error
654
655 allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
656 allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
657
658 tpPath := f.getTPpath(ctx, intfID, uni, TpID)
659
660 logger.Debugw(ctx, "creating-new-tcont-and-gem", log.Fields{
661 "intf-id": intfID,
662 "onu-id": onuID,
663 "uni-id": uniID,
664 "device-id": f.deviceHandler.device.Id,
665 "tp-id": TpID})
666
667 techProfileInstance, _ := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, TpID, tpPath)
668 if techProfileInstance == nil {
669 logger.Infow(ctx, "tp-instance-not-found--creating-new",
670 log.Fields{
671 "path": tpPath,
672 "device-id": f.deviceHandler.device.Id})
673 techProfileInstance, err = f.techprofile[intfID].CreateTechProfInstance(ctx, TpID, uni, intfID)
674 if err != nil {
675 // This should not happen, something wrong in KV backend transaction
676 logger.Errorw(ctx, "tp-instance-create-failed",
677 log.Fields{
678 "error": err,
679 "tp-id": TpID,
680 "device-id": f.deviceHandler.device.Id})
681 return 0, nil, nil
682 }
683 if err := f.resourceMgr.UpdateTechProfileIDForOnu(ctx, intfID, onuID, uniID, TpID); err != nil {
684 logger.Warnw(ctx, "failed-to-update-tech-profile-id", log.Fields{"error": err})
685 }
686 } else {
687 logger.Debugw(ctx, "tech-profile-instance-already-exist-for-given port-name",
688 log.Fields{
689 "uni": uni,
690 "device-id": f.deviceHandler.device.Id})
691 tpInstanceExists = true
692 }
693
694 switch tpInst := techProfileInstance.(type) {
695 case *tp.TechProfile:
696 if UsMeterID != 0 {
697 sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
698 uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
699 if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
700 logger.Errorw(ctx, "CreateSchedulerQueues-failed-upstream",
701 log.Fields{
702 "error": err,
703 "meter-id": UsMeterID,
704 "device-id": f.deviceHandler.device.Id})
705 return 0, nil, nil
706 }
707 }
708 if DsMeterID != 0 {
709 sq := schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
710 uniPort: uniPort, tpInst: techProfileInstance, meterID: DsMeterID, flowMetadata: flowMetadata}
711 if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
712 logger.Errorw(ctx, "CreateSchedulerQueues-failed-downstream",
713 log.Fields{
714 "error": err,
715 "meter-id": DsMeterID,
716 "device-id": f.deviceHandler.device.Id})
717 return 0, nil, nil
718 }
719 }
720 allocID := tpInst.UsScheduler.AllocID
721 for _, gem := range tpInst.UpstreamGemPortAttributeList {
722 gemPortIDs = append(gemPortIDs, gem.GemportID)
723 }
724 allocIDs = appendUnique(allocIDs, allocID)
725
726 if tpInstanceExists {
727 return allocID, gemPortIDs, techProfileInstance
728 }
729
730 for _, gemPortID := range gemPortIDs {
731 allgemPortIDs = appendUnique(allgemPortIDs, gemPortID)
732 }
733 logger.Infow(ctx, "allocated-tcont-and-gem-ports",
734 log.Fields{
735 "alloc-ids": allocIDs,
736 "gemports": allgemPortIDs,
737 "device-id": f.deviceHandler.device.Id})
738 // Send Tconts and GEM ports to KV store
739 f.storeTcontsGEMPortsIntoKVStore(ctx, intfID, onuID, uniID, allocIDs, allgemPortIDs)
740 return allocID, gemPortIDs, techProfileInstance
741 case *tp.EponProfile:
742 // CreateSchedulerQueues for EPON needs to be implemented here
743 // when voltha-protos for EPON is completed.
744
745 // TODO
746 if UsMeterID != 0 {
747 go func() {
748 sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
749 uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
750 if err := f.L2oamCreateSchedulerQueues(ctx, sq); err != nil {
751 logger.Errorw(ctx, "CreateSchedulerQueues-failed-upstream",
752 log.Fields{
753 "error": err,
754 "meter-id": UsMeterID,
755 "device-id": f.deviceHandler.device.Id})
756 //return 0, nil, nil
757 }
758 }()
759 }
760 /*
761 if DsMeterID != 0 {
762 sq := schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
763 uniPort: uniPort, tpInst: techProfileInstance, meterID: DsMeterID, flowMetadata: flowMetadata}
764 if err := f.L2oamCreateSchedulerQueues(ctx, sq); err != nil {
765 logger.Errorw(ctx, "CreateSchedulerQueues-failed-downstream",
766 log.Fields{
767 "error": err,
768 "meter-id": DsMeterID,
769 "device-id": f.deviceHandler.device.Id})
770 return 0, nil, nil
771 }
772 }
773 */
774
775 allocID := tpInst.AllocID
776 for _, gem := range tpInst.UpstreamQueueAttributeList {
777 gemPortIDs = append(gemPortIDs, gem.GemportID)
778 }
779 allocIDs = appendUnique(allocIDs, allocID)
780
781 if tpInstanceExists {
782 return allocID, gemPortIDs, techProfileInstance
783 }
784
785 for _, gemPortID := range gemPortIDs {
786 allgemPortIDs = appendUnique(allgemPortIDs, gemPortID)
787 }
788 logger.Infow(ctx, "allocated-tcont-and-gem-ports",
789 log.Fields{
790 "alloc-ids": allocIDs,
791 "gemports": allgemPortIDs,
792 "device-id": f.deviceHandler.device.Id})
793 // Send Tconts and GEM ports to KV store
794 f.storeTcontsGEMPortsIntoKVStore(ctx, intfID, onuID, uniID, allocIDs, allgemPortIDs)
795 return allocID, gemPortIDs, techProfileInstance
796 default:
797 logger.Errorw(ctx, "unknown-tech",
798 log.Fields{
799 "tpInst": tpInst})
800 return 0, nil, nil
801 }
802}
803
804func (f *OpenOltFlowMgr) storeTcontsGEMPortsIntoKVStore(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, allocID []uint32, gemPortIDs []uint32) {
805
806 logger.Debugw(ctx, "storing-allocated-tconts-and-gem-ports-into-KV-store",
807 log.Fields{
808 "intf-id": intfID,
809 "onu-id": onuID,
810 "uni-id": uniID,
811 "alloc-id": allocID,
812 "gemport-ids": gemPortIDs,
813 "device-id": f.deviceHandler.device.Id})
814 /* Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV store */
815 if err := f.resourceMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocID); err != nil {
816 logger.Errorw(ctx, "error-while-uploading-allocid-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
817 }
818 if err := f.resourceMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs); err != nil {
819 logger.Errorw(ctx, "error-while-uploading-gemports-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
820 }
821 if err := f.resourceMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, gemPortIDs, intfID, onuID, uniID); err != nil {
822 logger.Error(ctx, "error-while-uploading-gemtopon-map-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
823 }
824 logger.Infow(ctx, "stored-tconts-and-gem-into-kv-store-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
825 for _, gemPort := range gemPortIDs {
826 f.addGemPortToOnuInfoMap(ctx, intfID, onuID, gemPort)
827 }
828}
829
830func (f *OpenOltFlowMgr) populateTechProfilePerPonPort(ctx context.Context) error {
831 var tpCount int
832 for _, techRange := range f.resourceMgr.DevInfo.Ranges {
833 for _, intfID := range techRange.IntfIds {
834 f.techprofile[intfID] = f.resourceMgr.ResourceMgrs[uint32(intfID)].TechProfileMgr
835 tpCount++
836 logger.Debugw(ctx, "init-tech-profile-done",
837 log.Fields{
838 "intf-id": intfID,
839 "device-id": f.deviceHandler.device.Id})
840 }
841 }
842 if tpCount != int(f.resourceMgr.DevInfo.GetPonPorts()) {
843 return olterrors.NewErrInvalidValue(log.Fields{
844 "reason": "tP-count-does-not-match-number-of-pon-ports",
845 "tech-profile-count": tpCount,
846 "pon-port-count": f.resourceMgr.DevInfo.GetPonPorts(),
847 "device-id": f.deviceHandler.device.Id}, nil)
848 }
849 logger.Infow(ctx, "populated-techprofile-for-ponports-successfully",
850 log.Fields{
851 "numofTech": tpCount,
852 "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts(),
853 "device-id": f.deviceHandler.device.Id})
854 return nil
855}
856
857func (f *OpenOltFlowMgr) addUpstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
858 portNo uint32, uplinkClassifier map[string]interface{},
859 uplinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
860 allocID uint32, gemportID uint32, tpID uint32) error {
861 uplinkClassifier[PacketTagType] = SingleTag
862 logger.Debugw(ctx, "adding-upstream-data-flow",
863 log.Fields{
864 "uplinkClassifier": uplinkClassifier,
865 "uplinkAction": uplinkAction})
866 return f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
867 Upstream, logicalFlow, allocID, gemportID, tpID)
868 /* TODO: Install Secondary EAP on the subscriber vlan */
869}
870
871func (f *OpenOltFlowMgr) addDownstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
872 portNo uint32, downlinkClassifier map[string]interface{},
873 downlinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
874 allocID uint32, gemportID uint32, tpID uint32) error {
875 downlinkClassifier[PacketTagType] = DoubleTag
876 logger.Debugw(ctx, "adding-downstream-data-flow",
877 log.Fields{
878 "downlinkClassifier": downlinkClassifier,
879 "downlinkAction": downlinkAction})
880 if vlan, exists := downlinkClassifier[VlanVid]; exists {
881 if vlan.(uint32) == (uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000) { //private VLAN given by core
882 if metadata, exists := downlinkClassifier[Metadata]; exists { // inport is filled in metadata by core
883 if uint32(metadata.(uint64)) == MkUniPortNum(ctx, intfID, onuID, uniID) {
884 logger.Infow(ctx, "ignoring-dl-trap-device-flow-from-core",
885 log.Fields{
886 "flow": logicalFlow,
887 "device-id": f.deviceHandler.device.Id,
888 "onu-id": onuID,
889 "intf-id": intfID})
890 return nil
891 }
892 }
893 }
894 }
895
896 /* Already this info available classifier? */
897 downlinkAction[PopVlan] = true
898 dlClVid, ok := downlinkClassifier[VlanVid].(uint32)
899 if ok {
900 downlinkAction[VlanVid] = dlClVid & 0xfff
901 } else {
902 return olterrors.NewErrInvalidValue(log.Fields{
903 "reason": "failed-to-convert-vlanid-classifier",
904 "vlan-id": VlanVid,
905 "device-id": f.deviceHandler.device.Id}, nil).Log()
906 }
907
908 return f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
909 Downstream, logicalFlow, allocID, gemportID, tpID)
910}
911
912func (f *OpenOltFlowMgr) addHSIAFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
913 action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
914 allocID uint32, gemPortID uint32, tpID uint32) error {
915 /* One of the OLT platform (Broadcom BAL) requires that symmetric
916 flows require the same flow_id to be used across UL and DL.
917 Since HSIA flow is the only symmetric flow currently, we need to
918 re-use the flow_id across both direction. The 'flow_category'
919 takes priority over flow_cookie to find any available HSIA_FLOW
920 id for the ONU.
921 */
922 logger.Infow(ctx, "adding-hsia-flow",
923 log.Fields{
924 "intf-id": intfID,
925 "onu-id": onuID,
926 "uni-id": uniID,
927 "device-id": f.deviceHandler.device.Id,
928 "classifier": classifier,
929 "action": action,
930 "direction": direction,
931 "alloc-id": allocID,
932 "gemport-id": gemPortID,
933 "logicalflow": *logicalFlow})
934 var vlanPbit uint32 = 0xff // means no pbit
935 var vlanVid uint32
936 if _, ok := classifier[VlanPcp]; ok {
937 vlanPbit = classifier[VlanPcp].(uint32)
938 logger.Debugw(ctx, "found-pbit-in-flow",
939 log.Fields{
940 "vlan-pbit": vlanPbit,
941 "intf-id": intfID,
942 "onu-id": onuID,
943 "device-id": f.deviceHandler.device.Id})
944 } else {
945 logger.Debugw(ctx, "pbit-not-found-in-flow",
946 log.Fields{
947 "vlan-pcp": VlanPcp,
948 "intf-id": intfID,
949 "onu-id": onuID,
950 "device-id": f.deviceHandler.device.Id})
951 }
952 if _, ok := classifier[VlanVid]; ok {
953 vlanVid = classifier[VlanVid].(uint32)
954 logger.Debugw(ctx, "found-vlan-in-the-flow",
955 log.Fields{
956 "vlan-vid": vlanVid,
957 "intf-id": intfID,
958 "onu-id": onuID,
959 "device-id": f.deviceHandler.device.Id})
960 }
961 flowStoreCookie := getFlowStoreCookie(ctx, classifier, gemPortID)
962 if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
963 logger.Infow(ctx, "flow-already-exists",
964 log.Fields{
965 "device-id": f.deviceHandler.device.Id,
966 "intf-id": intfID,
967 "onu-id": onuID})
968 return nil
969 }
970 flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanVid, vlanPbit)
971 if err != nil {
972 return olterrors.NewErrNotFound("hsia-flow-id",
973 log.Fields{
974 "direction": direction,
975 "device-id": f.deviceHandler.device.Id,
976 "intf-id": intfID,
977 "onu-id": onuID,
978 }, err).Log()
979 }
980 classifierProto, err := makeOpenOltClassifierField(classifier)
981 if err != nil {
982 return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier, "device-id": f.deviceHandler.device.Id}, err).Log()
983 }
984 logger.Debugw(ctx, "created-classifier-proto",
985 log.Fields{
986 "classifier": *classifierProto,
987 "device-id": f.deviceHandler.device.Id})
988 actionProto, err := makeOpenOltActionField(action, classifier)
989 if err != nil {
990 return olterrors.NewErrInvalidValue(log.Fields{"action": action, "device-id": f.deviceHandler.device.Id}, err).Log()
991 }
992 logger.Debugw(ctx, "created-action-proto",
993 log.Fields{
994 "action": *actionProto,
995 "device-id": f.deviceHandler.device.Id})
996 networkIntfID, err := getNniIntfID(ctx, classifier, action)
997 if err != nil {
998 return olterrors.NewErrNotFound("nni-interface-id",
999 log.Fields{
1000 "classifier": classifier,
1001 "action": action,
1002 "device-id": f.deviceHandler.device.Id,
1003 }, err).Log()
1004 }
1005 flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
1006 OnuId: int32(onuID),
1007 UniId: int32(uniID),
1008 FlowId: flowID,
1009 FlowType: direction,
1010 AllocId: int32(allocID),
1011 NetworkIntfId: int32(networkIntfID),
1012 GemportId: int32(gemPortID),
1013 Classifier: classifierProto,
1014 Action: actionProto,
1015 Priority: int32(logicalFlow.Priority),
1016 Cookie: logicalFlow.Cookie,
1017 PortNo: portNo,
1018 TechProfileId: tpID,
1019 }
1020 if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
1021 return olterrors.NewErrFlowOp("add", flowID, nil, err).Log()
1022 }
1023 logger.Infow(ctx, "hsia-flow-added-to-device-successfully",
1024 log.Fields{"direction": direction,
1025 "device-id": f.deviceHandler.device.Id,
1026 "flow": flow,
1027 "intf-id": intfID,
1028 "onu-id": onuID})
1029 flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
1030 if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
1031 flow.OnuId,
1032 flow.UniId,
1033 flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
1034 return olterrors.NewErrPersistence("update", "flow", flowID,
1035 log.Fields{
1036 "flow": flow,
1037 "device-id": f.deviceHandler.device.Id,
1038 "intf-id": intfID,
1039 "onu-id": onuID}, err).Log()
1040 }
1041
1042 go f.l2oamAddFlowDevice(ctx, intfID, onuID, classifierProto.OVid, classifierProto.IVid)
1043
1044 return nil
1045}
1046
1047func (f *OpenOltFlowMgr) addDHCPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
1048 classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32,
1049 gemPortID uint32, tpID uint32) error {
1050
1051 networkIntfID, err := getNniIntfID(ctx, classifier, action)
1052 if err != nil {
1053 return olterrors.NewErrNotFound("nni-interface-id", log.Fields{
1054 "classifier": classifier,
1055 "action": action,
1056 "device-id": f.deviceHandler.device.Id},
1057 err).Log()
1058 }
1059
1060 for k := range action {
1061 delete(action, k)
1062 }
1063
1064 action[TrapToHost] = true
1065 classifier[UDPSrc] = uint32(68)
1066 classifier[UDPDst] = uint32(67)
1067 classifier[PacketTagType] = SingleTag
1068
1069 flowStoreCookie := getFlowStoreCookie(ctx, classifier, gemPortID)
1070 if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
1071 logger.Infow(ctx, "flow-exists--not-re-adding",
1072 log.Fields{
1073 "device-id": f.deviceHandler.device.Id,
1074 "intf-id": intfID,
1075 "onu-id": onuID})
1076 return nil
1077 }
1078
1079 flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
1080
1081 if err != nil {
1082 return olterrors.NewErrNotFound("flow",
1083 log.Fields{
1084 "interface-id": intfID,
1085 "gem-port": gemPortID,
1086 "cookie": flowStoreCookie,
1087 "device-id": f.deviceHandler.device.Id},
1088 err).Log()
1089 }
1090
1091 logger.Debugw(ctx, "creating-ul-dhcp-flow",
1092 log.Fields{
1093 "ul_classifier": classifier,
1094 "ul_action": action,
1095 "uplinkFlowId": flowID,
1096 "intf-id": intfID,
1097 "onu-id": onuID,
1098 "device-id": f.deviceHandler.device.Id})
1099
1100 classifierProto, err := makeOpenOltClassifierField(classifier)
1101 if err != nil {
1102 return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier}, err).Log()
1103 }
1104 logger.Debugw(ctx, "created-classifier-proto", log.Fields{"classifier": *classifierProto})
1105 actionProto, err := makeOpenOltActionField(action, classifier)
1106 if err != nil {
1107 return olterrors.NewErrInvalidValue(log.Fields{"action": action}, err).Log()
1108 }
1109
1110 dhcpFlow := openoltpb2.Flow{AccessIntfId: int32(intfID),
1111 OnuId: int32(onuID),
1112 UniId: int32(uniID),
1113 FlowId: flowID,
1114 FlowType: Upstream,
1115 AllocId: int32(allocID),
1116 NetworkIntfId: int32(networkIntfID),
1117 GemportId: int32(gemPortID),
1118 Classifier: classifierProto,
1119 Action: actionProto,
1120 Priority: int32(logicalFlow.Priority),
1121 Cookie: logicalFlow.Cookie,
1122 PortNo: portNo,
1123 TechProfileId: tpID,
1124 }
1125 if err := f.addFlowToDevice(ctx, logicalFlow, &dhcpFlow); err != nil {
1126 return olterrors.NewErrFlowOp("add", flowID, log.Fields{"dhcp-flow": dhcpFlow}, err).Log()
1127 }
1128 logger.Infow(ctx, "dhcp-ul-flow-added-to-device-successfully",
1129 log.Fields{
1130 "device-id": f.deviceHandler.device.Id,
1131 "flow-id": flowID,
1132 "intf-id": intfID,
1133 "onu-id": onuID})
1134 flowsToKVStore := f.getUpdatedFlowInfo(ctx, &dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
1135 if err := f.updateFlowInfoToKVStore(ctx, dhcpFlow.AccessIntfId,
1136 dhcpFlow.OnuId,
1137 dhcpFlow.UniId,
1138 dhcpFlow.FlowId, flowsToKVStore); err != nil {
1139 return olterrors.NewErrPersistence("update", "flow", dhcpFlow.FlowId,
1140 log.Fields{
1141 "flow": dhcpFlow,
1142 "device-id": f.deviceHandler.device.Id}, err).Log()
1143 }
1144
1145 return nil
1146}
1147
1148//addIGMPTrapFlow creates IGMP trap-to-host flow
1149func (f *OpenOltFlowMgr) addIGMPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
1150 action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, tpID uint32) error {
1151 return f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow, tpID)
1152}
1153
1154//addUpstreamTrapFlow creates a trap-to-host flow
1155func (f *OpenOltFlowMgr) addUpstreamTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
1156 action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string, tpID uint32) error {
1157
1158 networkIntfID, err := getNniIntfID(ctx, classifier, action)
1159 if err != nil {
1160 return olterrors.NewErrNotFound("nni-interface-id",
1161 log.Fields{
1162 "classifier": classifier,
1163 "action": action,
1164 "device-id": f.deviceHandler.device.Id},
1165 err).Log()
1166 }
1167
1168 for k := range action {
1169 delete(action, k)
1170 }
1171
1172 action[TrapToHost] = true
1173 classifier[PacketTagType] = SingleTag
1174 delete(classifier, VlanVid)
1175
1176 flowStoreCookie := getFlowStoreCookie(ctx, classifier, gemPortID)
1177 if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkIntfID), int32(onuID), int32(uniID), flowStoreCookie); present {
1178 logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
1179 return nil
1180 }
1181
1182 flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, flowType, 0, 0 /*classifier[VLAN_PCP].(uint32)*/)
1183
1184 if err != nil {
1185 return olterrors.NewErrNotFound("flow-id",
1186 log.Fields{
1187 "intf-id": intfID,
1188 "oni-id": onuID,
1189 "cookie": flowStoreCookie,
1190 "flow-type": flowType,
1191 "device-id": f.deviceHandler.device.Id,
1192 "onu-id": onuID},
1193 err).Log()
1194 }
1195
1196 logger.Debugw(ctx, "creating-upstream-trap-flow",
1197 log.Fields{
1198 "ul_classifier": classifier,
1199 "ul_action": action,
1200 "uplinkFlowId": flowID,
1201 "flowType": flowType,
1202 "device-id": f.deviceHandler.device.Id,
1203 "intf-id": intfID,
1204 "onu-id": onuID})
1205
1206 classifierProto, err := makeOpenOltClassifierField(classifier)
1207 if err != nil {
1208 return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier, "device-id": f.deviceHandler.device.Id}, err).Log()
1209 }
1210 logger.Debugw(ctx, "created-classifier-proto",
1211 log.Fields{
1212 "classifier": *classifierProto,
1213 "device-id": f.deviceHandler.device.Id})
1214 actionProto, err := makeOpenOltActionField(action, classifier)
1215 if err != nil {
1216 return olterrors.NewErrInvalidValue(log.Fields{"action": action, "device-id": f.deviceHandler.device.Id}, err).Log()
1217 }
1218
1219 flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
1220 OnuId: int32(onuID),
1221 UniId: int32(uniID),
1222 FlowId: flowID,
1223 FlowType: Upstream,
1224 AllocId: int32(allocID),
1225 NetworkIntfId: int32(networkIntfID),
1226 GemportId: int32(gemPortID),
1227 Classifier: classifierProto,
1228 Action: actionProto,
1229 Priority: int32(logicalFlow.Priority),
1230 Cookie: logicalFlow.Cookie,
1231 PortNo: portNo,
1232 TechProfileId: tpID,
1233 }
1234
1235 if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
1236 return olterrors.NewErrFlowOp("add", flowID, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
1237 }
1238 logger.Infof(ctx, "%s ul-flow-added-to-device-successfully", flowType)
1239
1240 flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
1241 if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
1242 flow.OnuId,
1243 flow.UniId,
1244 flow.FlowId, flowsToKVStore); err != nil {
1245 return olterrors.NewErrPersistence("update", "flow", flow.FlowId, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
1246 }
1247
1248 return nil
1249}
1250
1251// Add EAPOL flow to device with mac, vlanId as classifier for upstream and downstream
1252func (f *OpenOltFlowMgr) addEAPOLFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
1253 classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32,
1254 gemPortID uint32, vlanID uint32, tpID uint32) error {
1255 logger.Infow(ctx, "adding-eapol-to-device",
1256 log.Fields{
1257 "intf-id": intfID,
1258 "onu-id": onuID,
1259 "port-no": portNo,
1260 "alloc-id": allocID,
1261 "gemport-id": gemPortID,
1262 "vlan-id": vlanID,
1263 "flow": logicalFlow})
1264
1265 uplinkClassifier := make(map[string]interface{})
1266 uplinkAction := make(map[string]interface{})
1267
1268 uplinkClassifier[EthType] = uint32(EapEthType)
1269 uplinkClassifier[PacketTagType] = SingleTag
1270 uplinkClassifier[VlanVid] = vlanID
1271 uplinkClassifier[VlanPcp] = classifier[VlanPcp]
1272 uplinkAction[TrapToHost] = true
1273 flowStoreCookie := getFlowStoreCookie(ctx, uplinkClassifier, gemPortID)
1274 if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
1275 logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{
1276 "device-id": f.deviceHandler.device.Id,
1277 "onu-id": onuID,
1278 "intf-id": intfID})
1279 return nil
1280 }
1281 uplinkFlowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0, 0)
1282 if err != nil {
1283 return olterrors.NewErrNotFound("flow-id",
1284 log.Fields{
1285 "intf-id": intfID,
1286 "onu-id": onuID,
1287 "coookie": flowStoreCookie,
1288 "device-id": f.deviceHandler.device.Id},
1289 err).Log()
1290 }
1291 logger.Debugw(ctx, "creating-ul-eapol-flow",
1292 log.Fields{
1293 "ul_classifier": uplinkClassifier,
1294 "ul_action": uplinkAction,
1295 "uplinkFlowId": uplinkFlowID,
1296 "device-id": f.deviceHandler.device.Id,
1297 "intf-id": intfID,
1298 "onu-id": onuID})
1299
1300 classifierProto, err := makeOpenOltClassifierField(uplinkClassifier)
1301 if err != nil {
1302 return olterrors.NewErrInvalidValue(log.Fields{
1303 "classifier": uplinkClassifier,
1304 "device-id": f.deviceHandler.device.Id}, err).Log()
1305 }
1306 logger.Debugw(ctx, "created-classifier-proto",
1307 log.Fields{
1308 "classifier": *classifierProto,
1309 "device-id": f.deviceHandler.device.Id})
1310 actionProto, err := makeOpenOltActionField(uplinkAction, uplinkClassifier)
1311 if err != nil {
1312 return olterrors.NewErrInvalidValue(log.Fields{"action": uplinkAction, "device-id": f.deviceHandler.device.Id}, err).Log()
1313 }
1314 logger.Debugw(ctx, "created-action-proto",
1315 log.Fields{
1316 "action": *actionProto,
1317 "device-id": f.deviceHandler.device.Id})
1318 networkIntfID, err := getNniIntfID(ctx, classifier, action)
1319 if err != nil {
1320 return olterrors.NewErrNotFound("nni-interface-id", log.Fields{
1321 "classifier": classifier,
1322 "action": action,
1323 "device-id": f.deviceHandler.device.Id},
1324 err).Log()
1325 }
1326
1327 upstreamFlow := openoltpb2.Flow{AccessIntfId: int32(intfID),
1328 OnuId: int32(onuID),
1329 UniId: int32(uniID),
1330 FlowId: uplinkFlowID,
1331 FlowType: Upstream,
1332 AllocId: int32(allocID),
1333 NetworkIntfId: int32(networkIntfID),
1334 GemportId: int32(gemPortID),
1335 Classifier: classifierProto,
1336 Action: actionProto,
1337 Priority: int32(logicalFlow.Priority),
1338 Cookie: logicalFlow.Cookie,
1339 PortNo: portNo,
1340 TechProfileId: tpID,
1341 }
1342 if err := f.addFlowToDevice(ctx, logicalFlow, &upstreamFlow); err != nil {
1343 return olterrors.NewErrFlowOp("add", uplinkFlowID, log.Fields{"flow": upstreamFlow}, err).Log()
1344 }
1345 logger.Infow(ctx, "eapol-ul-flow-added-to-device-successfully",
1346 log.Fields{
1347 "device-id": f.deviceHandler.device.Id,
1348 "onu-id": onuID,
1349 "intf-id": intfID,
1350 })
1351 flowCategory := "EAPOL"
1352 flowsToKVStore := f.getUpdatedFlowInfo(ctx, &upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
1353 if err := f.updateFlowInfoToKVStore(ctx, upstreamFlow.AccessIntfId,
1354 upstreamFlow.OnuId,
1355 upstreamFlow.UniId,
1356 upstreamFlow.FlowId,
1357 /* lowCategory, */
1358 flowsToKVStore); err != nil {
1359 return olterrors.NewErrPersistence("update", "flow", upstreamFlow.FlowId,
1360 log.Fields{
1361 "flow": upstreamFlow,
1362 "device-id": f.deviceHandler.device.Id}, err).Log()
1363 }
1364 return nil
1365}
1366
1367func makeOpenOltClassifierField(classifierInfo map[string]interface{}) (*openoltpb2.Classifier, error) {
1368 var classifier openoltpb2.Classifier
1369
1370 classifier.EthType, _ = classifierInfo[EthType].(uint32)
1371 classifier.IpProto, _ = classifierInfo[IPProto].(uint32)
1372 if vlanID, ok := classifierInfo[VlanVid].(uint32); ok {
1373 if vlanID != ReservedVlan {
1374 vid := vlanID & VlanvIDMask
1375 classifier.OVid = vid
1376 }
1377 }
1378 if metadata, ok := classifierInfo[Metadata].(uint64); ok {
1379 vid := uint32(metadata)
1380 if vid != ReservedVlan {
1381 classifier.IVid = vid
1382 }
1383 }
1384 if vlanPcp, ok := classifierInfo[VlanPcp].(uint32); ok {
1385 classifier.OPbits = vlanPcp
1386 } else {
1387 classifier.OPbits = VlanPCPMask
1388 }
1389 classifier.SrcPort, _ = classifierInfo[UDPSrc].(uint32)
1390 classifier.DstPort, _ = classifierInfo[UDPDst].(uint32)
1391 classifier.DstIp, _ = classifierInfo[Ipv4Dst].(uint32)
1392 classifier.SrcIp, _ = classifierInfo[Ipv4Src].(uint32)
1393 classifier.DstMac, _ = classifierInfo[EthDst].([]uint8)
1394 if pktTagType, ok := classifierInfo[PacketTagType].(string); ok {
1395 classifier.PktTagType = pktTagType
1396
1397 switch pktTagType {
1398 case SingleTag:
1399 case DoubleTag:
1400 case Untagged:
1401 default:
1402 return nil, olterrors.NewErrInvalidValue(log.Fields{"packet-tag-type": pktTagType}, nil)
1403 }
1404 }
1405 return &classifier, nil
1406}
1407
1408func makeOpenOltActionField(actionInfo map[string]interface{}, classifierInfo map[string]interface{}) (*openoltpb2.Action, error) {
1409 var actionCmd openoltpb2.ActionCmd
1410 var action openoltpb2.Action
1411 action.Cmd = &actionCmd
1412 if _, ok := actionInfo[PopVlan]; ok {
1413 action.Cmd.RemoveOuterTag = true
1414 if _, ok := actionInfo[VlanPcp]; ok {
1415 action.Cmd.RemarkInnerPbits = true
1416 action.IPbits = actionInfo[VlanPcp].(uint32)
1417 if _, ok := actionInfo[VlanVid]; ok {
1418 action.Cmd.TranslateInnerTag = true
1419 action.IVid = actionInfo[VlanVid].(uint32)
1420 }
1421 }
1422 } else if _, ok := actionInfo[PushVlan]; ok {
1423 action.OVid = actionInfo[VlanVid].(uint32)
1424 action.Cmd.AddOuterTag = true
1425 if _, ok := actionInfo[VlanPcp]; ok {
1426 action.OPbits = actionInfo[VlanPcp].(uint32)
1427 action.Cmd.RemarkOuterPbits = true
1428 if _, ok := classifierInfo[VlanVid]; ok {
1429 action.IVid = classifierInfo[VlanVid].(uint32)
1430 action.Cmd.TranslateInnerTag = true
1431 }
1432 }
1433 } else if _, ok := actionInfo[TrapToHost]; ok {
1434 action.Cmd.TrapToHost = actionInfo[TrapToHost].(bool)
1435 } else {
1436 return nil, olterrors.NewErrInvalidValue(log.Fields{"action-command": actionInfo}, nil)
1437 }
1438 return &action, nil
1439}
1440
1441// getTPpath return the ETCD path for a given UNI port
1442func (f *OpenOltFlowMgr) getTPpath(ctx context.Context, intfID uint32, uniPath string, TpID uint32) string {
1443 return f.techprofile[intfID].GetTechProfileInstanceKVPath(ctx, TpID, uniPath)
1444}
1445
1446// DeleteTechProfileInstances removes the tech profile instances from persistent storage
1447func (f *OpenOltFlowMgr) DeleteTechProfileInstances(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, sn string) error {
1448 tpIDList := f.resourceMgr.GetTechProfileIDForOnu(ctx, intfID, onuID, uniID)
1449 uniPortName := getUniPortPath(f.deviceHandler.device.Id, intfID, int32(onuID), int32(uniID))
1450
1451 for _, tpID := range tpIDList {
1452 if err := f.DeleteTechProfileInstance(ctx, intfID, onuID, uniID, uniPortName, tpID); err != nil {
1453 _ = olterrors.NewErrAdapter("delete-tech-profile-failed", log.Fields{"device-id": f.deviceHandler.device.Id}, err).Log()
1454 // return err
1455 // We should continue to delete tech-profile instances for other TP IDs
1456 }
1457 logger.Debugw(ctx, "tech-profile-deleted", log.Fields{"device-id": f.deviceHandler.device.Id, "tp-id": tpID})
1458 }
1459 return nil
1460}
1461
1462// DeleteTechProfileInstance removes the tech profile instance from persistent storage
1463func (f *OpenOltFlowMgr) DeleteTechProfileInstance(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uniPortName string, tpID uint32) error {
1464 if uniPortName == "" {
1465 uniPortName = getUniPortPath(f.deviceHandler.device.Id, intfID, int32(onuID), int32(uniID))
1466 }
1467 if err := f.techprofile[intfID].DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
1468 return olterrors.NewErrAdapter("failed-to-delete-tp-instance-from-kv-store",
1469 log.Fields{
1470 "tp-id": tpID,
1471 "uni-port-name": uniPortName,
1472 "device-id": f.deviceHandler.device.Id}, err)
1473 }
1474 return nil
1475}
1476
1477func getFlowStoreCookie(ctx context.Context, classifier map[string]interface{}, gemPortID uint32) uint64 {
1478 if len(classifier) == 0 { // should never happen
1479 logger.Error(ctx, "invalid-classfier-object")
1480 return 0
1481 }
1482 logger.Debugw(ctx, "generating-flow-store-cookie",
1483 log.Fields{
1484 "classifier": classifier,
1485 "gemport-id": gemPortID})
1486 var jsonData []byte
1487 var flowString string
1488 var err error
1489 if jsonData, err = json.Marshal(classifier); err != nil {
1490 logger.Error(ctx, "failed-to-encode-classifier")
1491 return 0
1492 }
1493 flowString = string(jsonData)
1494 if gemPortID != 0 {
1495 flowString = fmt.Sprintf("%s%s", string(jsonData), string(gemPortID))
1496 }
1497 h := md5.New()
1498 _, _ = h.Write([]byte(flowString))
1499 hash := big.NewInt(0)
1500 hash.SetBytes(h.Sum(nil))
1501 generatedHash := hash.Uint64()
1502 logger.Debugw(ctx, "hash-generated", log.Fields{"hash": generatedHash})
1503 return generatedHash
1504}
1505
1506func (f *OpenOltFlowMgr) getUpdatedFlowInfo(ctx context.Context, flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
1507 var flows = []rsrcMgr.FlowInfo{{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie, LogicalFlowID: logicalFlowID}}
1508 var intfID uint32
1509 /* For flows which trap out of the NNI, the AccessIntfId is invalid
1510 (set to -1). In such cases, we need to refer to the NetworkIntfId .
1511 */
1512 if flow.AccessIntfId != -1 {
1513 intfID = uint32(flow.AccessIntfId)
1514 } else {
1515 intfID = uint32(flow.NetworkIntfId)
1516 }
1517 existingFlows := f.resourceMgr.GetFlowIDInfo(ctx, intfID, flow.OnuId, flow.UniId, flow.FlowId)
1518 if existingFlows != nil {
1519 logger.Debugw(ctx, "flow-exists-for-given-flowID--appending-it-to-current-flow",
1520 log.Fields{
1521 "flow-id": flow.FlowId,
1522 "device-id": f.deviceHandler.device.Id,
1523 "intf-id": intfID,
1524 "onu-id": flow.OnuId})
1525 //for _, f := range *existingFlows {
1526 // flows = append(flows, f)
1527 //}
1528 flows = append(flows, *existingFlows...)
1529 }
1530 logger.Debugw(ctx, "updated-flows-for-given-flowID-and-onuid",
1531 log.Fields{
1532 "updatedflow": flows,
1533 "flow-id": flow.FlowId,
1534 "onu-id": flow.OnuId,
1535 "device-id": f.deviceHandler.device.Id})
1536 return &flows
1537}
1538
1539func (f *OpenOltFlowMgr) updateFlowInfoToKVStore(ctx context.Context, intfID int32, onuID int32, uniID int32, flowID uint32, flows *[]rsrcMgr.FlowInfo) error {
1540 logger.Debugw(ctx, "storing-flow(s)-into-kv-store", log.Fields{
1541 "flow-id": flowID,
1542 "device-id": f.deviceHandler.device.Id,
1543 "intf-id": intfID,
1544 "onu-id": onuID})
1545 if err := f.resourceMgr.UpdateFlowIDInfo(ctx, intfID, onuID, uniID, flowID, flows); err != nil {
1546 logger.Warnw(ctx, "error-while-storing-flow-into-kv-store", log.Fields{
1547 "device-id": f.deviceHandler.device.Id,
1548 "onu-id": onuID,
1549 "intf-id": intfID,
1550 "flow-id": flowID})
1551 return err
1552 }
1553 logger.Infow(ctx, "stored-flow(s)-into-kv-store-successfully!", log.Fields{
1554 "device-id": f.deviceHandler.device.Id,
1555 "onu-id": onuID,
1556 "intf-id": intfID,
1557 "flow-id": flowID})
1558 return nil
1559}
1560
1561func (f *OpenOltFlowMgr) addFlowToDevice(ctx context.Context, logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
1562
1563 var intfID uint32
1564 /* For flows which trap out of the NNI, the AccessIntfId is invalid
1565 (set to -1). In such cases, we need to refer to the NetworkIntfId .
1566 */
1567 if deviceFlow.AccessIntfId != -1 {
1568 intfID = uint32(deviceFlow.AccessIntfId)
1569 } else {
1570 // REVIST : Why ponport is given as network port?
1571 intfID = uint32(deviceFlow.NetworkIntfId)
1572 }
1573
1574 logger.Debugw(ctx, "sending-flow-to-device-via-grpc", log.Fields{
1575 "flow": *deviceFlow,
1576 "device-id": f.deviceHandler.device.Id,
1577 "intf-id": intfID})
1578 _, err := f.deviceHandler.Client.FlowAdd(log.WithSpanFromContext(context.Background(), ctx), deviceFlow)
1579
1580 st, _ := status.FromError(err)
1581 if st.Code() == codes.AlreadyExists {
1582 logger.Debug(ctx, "flow-already-exists", log.Fields{
1583 "err": err,
1584 "deviceFlow": deviceFlow,
1585 "device-id": f.deviceHandler.device.Id,
1586 "intf-id": intfID})
1587 return nil
1588 }
1589
1590 if err != nil {
1591 logger.Errorw(ctx, "failed-to-add-flow-to-device",
1592 log.Fields{"err": err,
1593 "device-flow": deviceFlow,
1594 "device-id": f.deviceHandler.device.Id,
1595 "intf-id": intfID})
1596 f.resourceMgr.FreeFlowID(ctx, intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
1597 return err
1598 }
1599 if deviceFlow.GemportId != -1 {
1600 // No need to register the flow if it is a trap on nni flow.
1601 if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
1602 logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
1603 return err
1604 }
1605 }
1606 logger.Infow(ctx, "flow-added-to-device-successfully ",
1607 log.Fields{
1608 "flow": *deviceFlow,
1609 "device-id": f.deviceHandler.device.Id,
1610 "intf-id": intfID})
1611 return nil
1612}
1613
1614func (f *OpenOltFlowMgr) removeFlowFromDevice(ctx context.Context, deviceFlow *openoltpb2.Flow, ofFlowID uint64) error {
1615 logger.Debugw(ctx, "sending-flow-to-device-via-grpc",
1616 log.Fields{
1617 "flow": *deviceFlow,
1618 "device-id": f.deviceHandler.device.Id})
1619 _, err := f.deviceHandler.Client.FlowRemove(log.WithSpanFromContext(context.Background(), ctx), deviceFlow)
1620 if err != nil {
1621 if f.deviceHandler.device.ConnectStatus == common.ConnectStatus_UNREACHABLE {
1622 logger.Warnw(ctx, "can-not-remove-flow-from-device--unreachable",
1623 log.Fields{
1624 "err": err,
1625 "deviceFlow": deviceFlow,
1626 "device-id": f.deviceHandler.device.Id})
1627 //Assume the flow is removed
1628 return nil
1629 }
1630 return olterrors.NewErrFlowOp("remove", deviceFlow.FlowId, log.Fields{"deviceFlow": deviceFlow}, err)
1631
1632 }
1633 logger.Infow(ctx, "flow-removed-from-device-successfully", log.Fields{
1634 "of-flow-id": ofFlowID,
1635 "flow": *deviceFlow,
1636 "device-id": f.deviceHandler.device.Id,
1637 })
1638 return nil
1639}
1640
1641func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) error {
1642
1643 classifierInfo := make(map[string]interface{})
1644 actionInfo := make(map[string]interface{})
1645
1646 classifierInfo[EthType] = uint32(LldpEthType)
1647 classifierInfo[PacketTagType] = Untagged
1648 actionInfo[TrapToHost] = true
1649
1650
1651 var onuID = -1
1652 var uniID = -1
1653 var gemPortID = -1
1654
1655 networkInterfaceID, err := IntfIDFromNniPortNum(ctx, portNo)
1656 if err != nil {
1657 return olterrors.NewErrInvalidValue(log.Fields{"nni-port-number": portNo}, err).Log()
1658 }
1659 var flowStoreCookie = getFlowStoreCookie(ctx, classifierInfo, uint32(0))
1660 if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
1661 logger.Infow(ctx, "flow-exists--not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
1662 return nil
1663 }
1664 flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
1665
1666 if err != nil {
1667 return olterrors.NewErrNotFound("flow-id",
1668 log.Fields{
1669 "interface-id": networkInterfaceID,
1670 "onu-id": onuID,
1671 "uni-id": uniID,
1672 "gem-port-id": gemPortID,
1673 "cookie": flowStoreCookie,
1674 "device-id": f.deviceHandler.device.Id},
1675 err)
1676 }
1677 classifierProto, err := makeOpenOltClassifierField(classifierInfo)
1678 if err != nil {
1679 return olterrors.NewErrInvalidValue(
1680 log.Fields{
1681 "classifier": classifierInfo,
1682 "device-id": f.deviceHandler.device.Id}, err)
1683 }
1684 logger.Debugw(ctx, "created-classifier-proto",
1685 log.Fields{
1686 "classifier": *classifierProto,
1687 "device-id": f.deviceHandler.device.Id})
1688 actionProto, err := makeOpenOltActionField(actionInfo, classifierInfo)
1689 if err != nil {
1690 return olterrors.NewErrInvalidValue(
1691 log.Fields{
1692 "action": actionInfo,
1693 "device-id": f.deviceHandler.device.Id}, err)
1694 }
1695 logger.Debugw(ctx, "created-action-proto",
1696 log.Fields{
1697 "action": *actionProto,
1698 "device-id": f.deviceHandler.device.Id})
1699
1700 downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
1701 OnuId: int32(onuID), // OnuId not required
1702 UniId: int32(uniID), // UniId not used
1703 FlowId: flowID,
1704 FlowType: Downstream,
1705 NetworkIntfId: int32(networkInterfaceID),
1706 GemportId: int32(gemPortID),
1707 Classifier: classifierProto,
1708 Action: actionProto,
1709 Priority: int32(flow.Priority),
1710 Cookie: flow.Cookie,
1711 PortNo: portNo}
1712 if err := f.addFlowToDevice(ctx, flow, &downstreamflow); err != nil {
1713 return olterrors.NewErrFlowOp("add", flowID,
1714 log.Fields{
1715 "flow": downstreamflow,
1716 "device-id": f.deviceHandler.device.Id}, err)
1717 }
1718 logger.Infow(ctx, "lldp-trap-on-nni-flow-added-to-device-successfully",
1719 log.Fields{
1720 "device-id": f.deviceHandler.device.Id,
1721 "onu-id": onuID,
1722 "flow-id": flowID})
1723 flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, flow.Id)
1724 if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
1725 int32(onuID),
1726 int32(uniID),
1727 flowID, flowsToKVStore); err != nil {
1728 return olterrors.NewErrPersistence("update", "flow", flowID,
1729 log.Fields{
1730 "flow": downstreamflow,
1731 "device-id": f.deviceHandler.device.Id}, err)
1732 }
1733 return nil
1734}
1735
1736func getUniPortPath(oltID string, intfID uint32, onuID int32, uniID int32) string {
1737 return fmt.Sprintf("olt-{%s}/pon-{%d}/onu-{%d}/uni-{%d}", oltID, intfID, onuID, uniID)
1738}
1739
1740//getOnuDevice to fetch onu from cache or core.
1741func (f *OpenOltFlowMgr) getOnuDevice(ctx context.Context, intfID uint32, onuID uint32) (*OnuDevice, error) {
1742 onuKey := f.deviceHandler.formOnuKey(intfID, onuID)
1743 onuDev, ok := f.deviceHandler.onus.Load(onuKey)
1744 if !ok {
1745 logger.Debugw(ctx, "couldnt-find-onu-in-cache",
1746 log.Fields{
1747 "intf-id": intfID,
1748 "onu-id": onuID,
1749 "device-id": f.deviceHandler.device.Id})
1750 onuDevice, err := f.getChildDevice(ctx, intfID, onuID)
1751 if err != nil {
1752 return nil, olterrors.NewErrNotFound("onu-child-device",
1753 log.Fields{
1754 "onu-id": onuID,
1755 "intf-id": intfID,
1756 "device-id": f.deviceHandler.device.Id}, err)
1757 }
1758 onuDev = NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuDevice.ProxyAddress.OnuId, onuDevice.ProxyAddress.ChannelId, onuDevice.ProxyAddress.DeviceId, false)
1759 //better to ad the device to cache here.
1760 f.deviceHandler.StoreOnuDevice(onuDev.(*OnuDevice))
1761 } else {
1762 logger.Debugw(ctx, "found-onu-in-cache",
1763 log.Fields{
1764 "intf-id": intfID,
1765 "onu-id": onuID,
1766 "device-id": f.deviceHandler.device.Id})
1767 }
1768
1769 return onuDev.(*OnuDevice), nil
1770}
1771
1772//getChildDevice to fetch onu
1773func (f *OpenOltFlowMgr) getChildDevice(ctx context.Context, intfID uint32, onuID uint32) (*voltha.Device, error) {
1774 logger.Infow(ctx, "GetChildDevice",
1775 log.Fields{
1776 "pon-port": intfID,
1777 "onu-id": onuID,
1778 "device-id": f.deviceHandler.device.Id})
1779 parentPortNo := IntfIDToPortNo(intfID, voltha.Port_PON_OLT)
1780 onuDevice, err := f.deviceHandler.GetChildDevice(ctx, parentPortNo, onuID)
1781 if err != nil {
1782 return nil, olterrors.NewErrNotFound("onu",
1783 log.Fields{
1784 "interface-id": parentPortNo,
1785 "onu-id": onuID,
1786 "device-id": f.deviceHandler.device.Id},
1787 err)
1788 }
1789 logger.Infow(ctx, "successfully-received-child-device-from-core",
1790 log.Fields{
1791 "device-id": f.deviceHandler.device.Id,
1792 "child_device_id": onuDevice.Id,
1793 "child_device_sn": onuDevice.SerialNumber})
1794 return onuDevice, nil
1795}
1796
1797func (f *OpenOltFlowMgr) sendDeleteGemPortToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortID uint32, tpPath string) error {
1798 onuDev, err := f.getOnuDevice(ctx, intfID, onuID)
1799 if err != nil {
1800 logger.Debugw(ctx, "couldnt-find-onu-child-device",
1801 log.Fields{
1802 "intf-id": intfID,
1803 "onu-id": onuID,
1804 "uni-id": uniID,
1805 "device-id": f.deviceHandler.device.Id})
1806 return err
1807 }
1808
1809 delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpPath: tpPath, GemPortId: gemPortID}
1810 logger.Debugw(ctx, "sending-gem-port-delete-to-openonu-adapter",
1811 log.Fields{
1812 "msg": *delGemPortMsg,
1813 "device-id": f.deviceHandler.device.Id})
1814 if sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
1815 delGemPortMsg,
1816 ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST,
1817 f.deviceHandler.device.Type,
1818 onuDev.deviceType,
1819 onuDev.deviceID,
1820 onuDev.proxyDeviceID, ""); sendErr != nil {
1821 return olterrors.NewErrCommunication("send-delete-gem-port-to-onu-adapter",
1822 log.Fields{
1823 "from-adapter": f.deviceHandler.device.Type,
1824 "to-adapter": onuDev.deviceType,
1825 "onu-id": onuDev.deviceID,
1826 "proxyDeviceID": onuDev.proxyDeviceID,
1827 "device-id": f.deviceHandler.device.Id}, sendErr)
1828 }
1829 logger.Infow(ctx, "success-sending-del-gem-port-to-onu-adapter",
1830 log.Fields{
1831 "msg": delGemPortMsg,
1832 "from-adapter": f.deviceHandler.device.Type,
1833 "to-adapter": onuDev.deviceType,
1834 "device-id": f.deviceHandler.device.Id})
1835 return nil
1836}
1837
1838func (f *OpenOltFlowMgr) sendDeleteTcontToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, allocID uint32, tpPath string) error {
1839 onuDev, err := f.getOnuDevice(ctx, intfID, onuID)
1840 if err != nil {
1841 logger.Warnw(ctx, "couldnt-find-onu-child-device",
1842 log.Fields{
1843 "intf-id": intfID,
1844 "onu-id": onuID,
1845 "uni-id": uniID,
1846 "device-id": f.deviceHandler.device.Id})
1847 return err
1848 }
1849
1850 delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpPath: tpPath, AllocId: allocID}
1851 logger.Debugw(ctx, "sending-tcont-delete-to-openonu-adapter",
1852 log.Fields{
1853 "msg": *delTcontMsg,
1854 "device-id": f.deviceHandler.device.Id})
1855 if sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
1856 delTcontMsg,
1857 ic.InterAdapterMessageType_DELETE_TCONT_REQUEST,
1858 f.deviceHandler.device.Type,
1859 onuDev.deviceType,
1860 onuDev.deviceID,
1861 onuDev.proxyDeviceID, ""); sendErr != nil {
1862 return olterrors.NewErrCommunication("send-delete-tcont-to-onu-adapter",
1863 log.Fields{
1864 "from-adapter": f.deviceHandler.device.Type,
1865 "to-adapter": onuDev.deviceType, "onu-id": onuDev.deviceID,
1866 "proxyDeviceID": onuDev.proxyDeviceID,
1867 "device-id": f.deviceHandler.device.Id}, sendErr)
1868 }
1869 logger.Infow(ctx, "success-sending-del-tcont-to-onu-adapter",
1870 log.Fields{
1871 "msg": delTcontMsg,
1872 "device-id": f.deviceHandler.device.Id})
1873 return nil
1874}
1875
1876func (f *OpenOltFlowMgr) deletePendingFlows(ctx context.Context, Intf uint32, onuID int32, uniID int32) {
1877 pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
1878 if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); ok {
1879 if val.(int) > 0 {
1880 pnFlDels := val.(int) - 1
1881 if pnFlDels > 0 {
1882 logger.Debugw(ctx, "flow-delete-succeeded--more-pending",
1883 log.Fields{
1884 "intf": Intf,
1885 "onu-id": onuID,
1886 "uni-id": uniID,
1887 "currpendingflowcnt": pnFlDels,
1888 "device-id": f.deviceHandler.device.Id})
1889 f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
1890 } else {
1891 logger.Debugw(ctx, "all-pending-flow-deletes-handled--removing-entry-from-map",
1892 log.Fields{
1893 "intf": Intf,
1894 "onu-id": onuID,
1895 "uni-id": uniID,
1896 "device-id": f.deviceHandler.device.Id})
1897 f.pendingFlowDelete.Delete(pnFlDelKey)
1898 }
1899 }
1900 } else {
1901 logger.Debugw(ctx, "no-pending-delete-flows-found",
1902 log.Fields{
1903 "intf": Intf,
1904 "onu-id": onuID,
1905 "uni-id": uniID,
1906 "device-id": f.deviceHandler.device.Id})
1907
1908 }
1909
1910}
1911
1912// Once the gemport is released for a given onu, it also has to be cleared from local cache
1913// which was used for deriving the gemport->logicalPortNo during packet-in.
1914// Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
1915// is conveyed to ONOS during packet-in OF message.
1916func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(ctx context.Context, intfID uint32, onuID uint32, gemPortID uint32) {
1917
1918 f.onuGemInfoLock.Lock()
1919 defer f.onuGemInfoLock.Unlock()
1920
1921 logger.Infow(ctx, "deleting-gem-from-local-cache",
1922 log.Fields{
1923 "gem-port-id": gemPortID,
1924 "intf-id": intfID,
1925 "onu-id": onuID,
1926 "device-id": f.deviceHandler.device.Id,
1927 "onu-gem": f.onuGemInfo[intfID]})
1928 onugem := f.onuGemInfo[intfID]
1929deleteLoop:
1930 for i, onu := range onugem {
1931 if onu.OnuID == onuID {
1932 for j, gem := range onu.GemPorts {
1933 // If the gemport is found, delete it from local cache.
1934 if gem == gemPortID {
1935 onu.GemPorts = append(onu.GemPorts[:j], onu.GemPorts[j+1:]...)
1936 onugem[i] = onu
1937 logger.Infow(ctx, "removed-gemport-from-local-cache",
1938 log.Fields{
1939 "intf-id": intfID,
1940 "onu-id": onuID,
1941 "deletedgemport-id": gemPortID,
1942 "gemports": onu.GemPorts,
1943 "device-id": f.deviceHandler.device.Id})
1944 break deleteLoop
1945 }
1946 }
1947 break deleteLoop
1948 }
1949 }
1950}
1951
1952//clearResources clears pon resources in kv store and the device
1953// nolint: gocyclo
1954func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
1955 gemPortID int32, flowID uint32, flowDirection string,
1956 portNum uint32, updatedFlows []rsrcMgr.FlowInfo) error {
1957
1958 tpID, err := getTpIDFromFlow(ctx, flow)
1959 if err != nil {
1960 return olterrors.NewErrNotFound("tp-id",
1961 log.Fields{
1962 "flow": flow,
1963 "intf": Intf,
1964 "onu-id": onuID,
1965 "uni-id": uniID,
1966 "device-id": f.deviceHandler.device.Id}, err)
1967 }
1968
1969 if len(updatedFlows) >= 0 {
1970 // There are still flows referencing the same flow_id.
1971 // So the flow should not be freed yet.
1972 // For ex: Case of HSIA where same flow is shared
1973 // between DS and US.
1974 if err := f.updateFlowInfoToKVStore(ctx, int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows); err != nil {
1975 _ = olterrors.NewErrPersistence("update", "flow", flowID,
1976 log.Fields{
1977 "flow": updatedFlows,
1978 "device-id": f.deviceHandler.device.Id}, err).Log()
1979 }
1980 if len(updatedFlows) == 0 {
1981 // Do this for subscriber flows only (not trap from NNI flows)
1982 if onuID != -1 && uniID != -1 {
1983 pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
1984 if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
1985 logger.Debugw(ctx, "creating-entry-for-pending-flow-delete",
1986 log.Fields{
1987 "flow-id": flowID,
1988 "intf": Intf,
1989 "onu-id": onuID,
1990 "uni-id": uniID,
1991 "device-id": f.deviceHandler.device.Id})
1992 f.pendingFlowDelete.Store(pnFlDelKey, 1)
1993 } else {
1994 pnFlDels := val.(int) + 1
1995 logger.Debugw(ctx, "updating-flow-delete-entry",
1996 log.Fields{
1997 "flow-id": flowID,
1998 "intf": Intf,
1999 "onu-id": onuID,
2000 "uni-id": uniID,
2001 "currPendingFlowCnt": pnFlDels,
2002 "device-id": f.deviceHandler.device.Id})
2003 f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
2004 }
2005
2006 defer f.deletePendingFlows(ctx, Intf, onuID, uniID)
2007 }
2008
2009 logger.Debugw(ctx, "releasing-flow-id-to-resource-manager",
2010 log.Fields{
2011 "Intf": Intf,
2012 "onu-id": onuID,
2013 "uni-id": uniID,
2014 "flow-id": flowID,
2015 "device-id": f.deviceHandler.device.Id})
2016 f.resourceMgr.FreeFlowID(ctx, Intf, int32(onuID), int32(uniID), flowID)
2017
2018 uni := getUniPortPath(f.deviceHandler.device.Id, Intf, onuID, uniID)
2019 tpPath := f.getTPpath(ctx, Intf, uni, tpID)
2020 logger.Debugw(ctx, "getting-techprofile-instance-for-subscriber",
2021 log.Fields{
2022 "TP-PATH": tpPath,
2023 "device-id": f.deviceHandler.device.Id})
2024 techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
2025 if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
2026 return olterrors.NewErrNotFound("tech-profile-in-kv-store",
2027 log.Fields{
2028 "tp-id": tpID,
2029 "path": tpPath}, err)
2030 }
2031
2032 gemPK := gemPortKey{Intf, uint32(gemPortID)}
2033 used, err := f.isGemPortUsedByAnotherFlow(ctx, gemPK)
2034 if err != nil {
2035 return err
2036 }
2037 if used {
2038 if f.perGemPortLock.TryLock(gemPK) {
2039 flowIDs := f.flowsUsedByGemPort[gemPK]
2040 for i, flowIDinMap := range flowIDs {
2041 if flowIDinMap == flowID {
2042 flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
2043 // everytime flowsUsedByGemPort cache is updated the same should be updated
2044 // in kv store by calling UpdateFlowIDsForGem
2045 f.flowsUsedByGemPort[gemPK] = flowIDs
2046 if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
2047 return err
2048 }
2049 break
2050 }
2051 }
2052 logger.Debugw(ctx, "gem-port-id-is-still-used-by-other-flows",
2053 log.Fields{
2054 "gemport-id": gemPortID,
2055 "usedByFlows": flowIDs,
2056 "device-id": f.deviceHandler.device.Id})
2057 f.perGemPortLock.Unlock(gemPK)
2058 return nil
2059 }
2060
2061 logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
2062 log.Fields{
2063 "gemport-id": gemPortID,
2064 "device-id": f.deviceHandler.device.Id,
2065 "key": gemPK,
2066 })
2067 return olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
2068 "gemport-id": gemPortID,
2069 "device-id": f.deviceHandler.device.Id,
2070 "key": gemPK,
2071 }, nil)
2072 }
2073 logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
2074 f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
2075 // TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
2076 // But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
2077 f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), Intf)
2078 f.deleteGemPortFromLocalCache(ctx, Intf, uint32(onuID), uint32(gemPortID))
2079 f.onuIdsLock.Lock()
2080 //everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
2081 // by calling DeleteFlowIDsForGem
2082 if f.perGemPortLock.TryLock(gemPK) {
2083 delete(f.flowsUsedByGemPort, gemPK)
2084 f.perGemPortLock.Unlock(gemPK)
2085 } else {
2086 logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
2087 log.Fields{
2088 "device-id": f.deviceHandler.device.Id,
2089 "key": gemPK,
2090 })
2091 }
2092 f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
2093 f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
2094 f.onuIdsLock.Unlock()
2095 // Delete the gem port on the ONU.
2096 if err := f.sendDeleteGemPortToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
2097 logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
2098 log.Fields{
2099 "err": err,
2100 "intf": Intf,
2101 "onu-id": onuID,
2102 "uni-id": uniID,
2103 "device-id": f.deviceHandler.device.Id,
2104 "gemport-id": gemPortID})
2105 }
2106 switch techprofileInst := techprofileInst.(type) {
2107 case *tp.TechProfile:
2108 ok, _ := f.isTechProfileUsedByAnotherGem(ctx, Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
2109 if !ok {
2110 if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
2111 logger.Warn(ctx, err)
2112 }
2113 if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
2114 logger.Warn(ctx, err)
2115 }
2116 if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
2117 logger.Warn(ctx, err)
2118 }
2119 if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
2120 logger.Warn(ctx, err)
2121 }
2122 f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
2123 // Delete the TCONT on the ONU.
2124 if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(techprofileInst.UsScheduler.AllocID), tpPath); err != nil {
2125 logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
2126 log.Fields{
2127 "intf": Intf,
2128 "onu-id": onuID,
2129 "uni-id": uniID,
2130 "device-id": f.deviceHandler.device.Id,
2131 "alloc-id": techprofileInst.UsScheduler.AllocID})
2132 }
2133 }
2134 case *tp.EponProfile:
2135 if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
2136 logger.Warn(ctx, err)
2137 }
2138 if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
2139 logger.Warn(ctx, err)
2140 }
2141 f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
2142 // Delete the TCONT on the ONU.
2143 if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(techprofileInst.AllocID), tpPath); err != nil {
2144 logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
2145 log.Fields{
2146 "intf": Intf,
2147 "onu-id": onuID,
2148 "uni-id": uniID,
2149 "device-id": f.deviceHandler.device.Id,
2150 "alloc-id": techprofileInst.AllocID})
2151 }
2152 default:
2153 logger.Errorw(ctx, "error-unknown-tech",
2154 log.Fields{
2155 "techprofileInst": techprofileInst})
2156 }
2157 }
2158 }
2159 return nil
2160}
2161
2162// nolint: gocyclo
2163func (f *OpenOltFlowMgr) clearFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) {
2164
2165 logger.Infow(ctx, "clear-flow-from-resource-manager",
2166 log.Fields{
2167 "flowDirection": flowDirection,
2168 "flow": *flow,
2169 "device-id": f.deviceHandler.device.Id})
2170
2171 if flowDirection == Multicast {
2172 f.clearMulticastFlowFromResourceManager(ctx, flow)
2173 return
2174 }
2175
2176 classifierInfo := make(map[string]interface{})
2177
2178 portNum, Intf, onu, uni, inPort, ethType, err := FlowExtractInfo(ctx, flow, flowDirection)
2179 if err != nil {
2180 logger.Error(ctx, err)
2181 return
2182 }
2183
2184 onuID := int32(onu)
2185 uniID := int32(uni)
2186
2187 for _, field := range flows.GetOfbFields(flow) {
2188 if field.Type == flows.IP_PROTO {
2189 classifierInfo[IPProto] = field.GetIpProto()
2190 logger.Debugw(ctx, "field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
2191 }
2192 }
2193 logger.Infow(ctx, "extracted-access-info-from-flow-to-be-deleted",
2194 log.Fields{
2195 "flow-id": flow.Id,
2196 "intf-id": Intf,
2197 "onu-id": onuID,
2198 "uni-id": uniID})
2199
2200 if ethType == LldpEthType || ((classifierInfo[IPProto] == IPProtoDhcp) && (flowDirection == "downstream")) {
2201 onuID = -1
2202 uniID = -1
2203 logger.Debug(ctx, "trap-on-nni-flow-set-oni--uni-to- -1")
2204 Intf, err = IntfIDFromNniPortNum(ctx, inPort)
2205 if err != nil {
2206 logger.Errorw(ctx, "invalid-in-port-number",
2207 log.Fields{
2208 "port-number": inPort,
2209 "error": err})
2210 return
2211 }
2212 }
2213 flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, Intf, onuID, uniID)
2214 for _, flowID := range flowIds {
2215 flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flowID)
2216 if flowInfo == nil {
2217 logger.Debugw(ctx, "no-flowinfo-found-in-kv-store",
2218 log.Fields{
2219 "intf": Intf,
2220 "onu-id": onuID,
2221 "uni-id": uniID,
2222 "flow-id": flowID})
2223 return
2224 }
2225
2226 updatedFlows := *flowInfo
2227 for i, storedFlow := range updatedFlows {
2228 if flow.Id == storedFlow.LogicalFlowID {
2229 removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
2230 logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": storedFlow})
2231 // DKB
2232 if err = f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
2233 logger.Errorw(ctx, "failed-to-remove-flow", log.Fields{"error": err})
2234 return
2235 }
2236 logger.Info(ctx, "flow-removed-from-device-successfully", log.Fields{
2237 "flow-id": flow.Id,
2238 "stored-flow": storedFlow,
2239 "device-id": f.deviceHandler.device.Id,
2240 "stored-flow-id": flowID,
2241 "onu-id": onuID,
2242 "intf": Intf,
2243 })
2244 //Remove the Flow from FlowInfo
2245 updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
2246 if err = f.clearResources(ctx, flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
2247 flowID, flowDirection, portNum, updatedFlows); err != nil {
2248 logger.Error(ctx, "failed-to-clear-resources-for-flow", log.Fields{
2249 "flow-id": flow.Id,
2250 "stored-flow": storedFlow,
2251 "device-id": f.deviceHandler.device.Id,
2252 "stored-flow-id": flowID,
2253 "onu-id": onuID,
2254 "intf": Intf,
2255 })
2256 return
2257 }
2258 }
2259 }
2260 }
2261}
2262
2263//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
2264// clears resources reserved for this multicast flow
2265func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
2266 classifierInfo := make(map[string]interface{})
2267 formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
2268 networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
2269
2270 if err != nil {
2271 logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
2272 return
2273 }
2274
2275 var onuID = int32(NoneOnuID)
2276 var uniID = int32(NoneUniID)
2277 var flowID uint32
2278
2279 flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
2280
2281 for _, flowID = range flowIds {
2282 flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
2283 if flowInfo == nil {
2284 logger.Debugw(ctx, "no-multicast-flowinfo-found-in-the-kv-store",
2285 log.Fields{
2286 "intf": networkInterfaceID,
2287 "onu-id": onuID,
2288 "uni-id": uniID,
2289 "flow-id": flowID})
2290 continue
2291 }
2292 updatedFlows := *flowInfo
2293 for i, storedFlow := range updatedFlows {
2294 if flow.Id == storedFlow.LogicalFlowID {
2295 removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
2296 logger.Debugw(ctx, "multicast-flow-to-be-deleted",
2297 log.Fields{
2298 "flow": storedFlow,
2299 "flow-id": flow.Id,
2300 "device-id": f.deviceHandler.device.Id})
2301 //remove from device
2302 if err := f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
2303 // DKB
2304 logger.Errorw(ctx, "failed-to-remove-multicast-flow",
2305 log.Fields{
2306 "flow-id": flow.Id,
2307 "error": err})
2308 return
2309 }
2310 logger.Infow(ctx, "multicast-flow-removed-from-device-successfully", log.Fields{"flow-id": flow.Id})
2311 //Remove the Flow from FlowInfo
2312 updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
2313 if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
2314 logger.Errorw(ctx, "failed-to-delete-multicast-flow-from-the-kv-store",
2315 log.Fields{"flow": storedFlow,
2316 "err": err})
2317 return
2318 }
2319 //release flow id
2320 logger.Debugw(ctx, "releasing-multicast-flow-id",
2321 log.Fields{"flow-id": flowID,
2322 "interfaceID": networkInterfaceID})
2323 f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
2324 }
2325 }
2326 }
2327}
2328
2329//RemoveFlow removes the flow from the device
2330func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) error {
2331 logger.Infow(ctx, "removing-flow", log.Fields{"flow": *flow})
2332 var direction string
2333 actionInfo := make(map[string]interface{})
2334
2335 for _, action := range flows.GetActions(flow) {
2336 if action.Type == flows.OUTPUT {
2337 if out := action.GetOutput(); out != nil {
2338 actionInfo[Output] = out.GetPort()
2339 logger.Debugw(ctx, "action-type-output", log.Fields{"out_port": actionInfo[Output].(uint32)})
2340 } else {
2341 logger.Error(ctx, "invalid-output-port-in-action")
2342 return olterrors.NewErrInvalidValue(log.Fields{"invalid-out-port-action": 0}, nil)
2343 }
2344 }
2345 }
2346
2347 if flows.HasGroup(flow) {
2348 direction = Multicast
2349 f.clearFlowFromResourceManager(ctx, flow, direction)
2350 return nil
2351 } else if IsUpstream(actionInfo[Output].(uint32)) {
2352 direction = Upstream
2353 } else {
2354 direction = Downstream
2355 }
2356
2357 _, intfID, onuID, uniID, _, _, err := FlowExtractInfo(ctx, flow, direction)
2358 if err != nil {
2359 return err
2360 }
2361
2362 userKey := tpLockKey{intfID, onuID, uniID}
2363
2364 if f.perUserFlowHandleLock.TryLock(userKey) {
2365 f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
2366 f.perUserFlowHandleLock.Unlock(userKey)
2367 } else {
2368 // Ideally this should never happen
2369 logger.Errorw(ctx, "failed-to-acquire-lock-to-remove-flow--remove-aborted", log.Fields{"flow": flow})
2370 return errors.New("failed-to-acquire-per-user-lock")
2371 }
2372
2373 return nil
2374}
2375
2376func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,
2377 uniID uint32, ch chan bool) {
2378 pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
2379 for {
2380 select {
2381 case <-time.After(20 * time.Millisecond):
2382 if flowDelRefCnt, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok || flowDelRefCnt == 0 {
2383 logger.Debug(ctx, "pending-flow-deletes-completed")
2384 ch <- true
2385 return
2386 }
2387 case <-ctx.Done():
2388 logger.Error(ctx, "flow-delete-wait-handler-routine-canceled")
2389 return
2390 }
2391 }
2392}
2393
2394//isIgmpTrapDownstreamFlow return true if the flow is a downsteam IGMP trap-to-host flow; false otherwise
2395func isIgmpTrapDownstreamFlow(classifierInfo map[string]interface{}) bool {
2396 if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_ETHERNET_NNI {
2397 if ethType, ok := classifierInfo[EthType]; ok {
2398 if ethType.(uint32) == IPv4EthType {
2399 if ipProto, ok := classifierInfo[IPProto]; ok {
2400 if ipProto.(uint32) == IgmpProto {
2401 return true
2402 }
2403 }
2404 }
2405 }
2406 }
2407 return false
2408}
2409
2410// AddFlow add flow to device
2411// nolint: gocyclo
2412func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {
2413 classifierInfo := make(map[string]interface{})
2414 actionInfo := make(map[string]interface{})
2415 var UsMeterID uint32
2416 var DsMeterID uint32
2417
2418 logger.Infow(ctx, "adding-flow",
2419 log.Fields{
2420 "flow": flow,
2421 "flowmetadata": flowMetadata})
2422 formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
2423
2424 err := formulateActionInfoFromFlow(ctx, actionInfo, classifierInfo, flow)
2425 if err != nil {
2426 // Error logging is already done in the called function
2427 // So just return in case of error
2428 return err
2429 }
2430
2431 if flows.HasGroup(flow) {
2432 // handle multicast flow
2433 return f.handleFlowWithGroup(ctx, actionInfo, classifierInfo, flow)
2434 }
2435
2436 /* Controller bound trap flows */
2437 err = formulateControllerBoundTrapFlowInfo(ctx, actionInfo, classifierInfo, flow)
2438 if err != nil {
2439 // error if any, already logged in the called function
2440 return err
2441 }
2442
2443 logger.Debugw(ctx, "flow-ports",
2444 log.Fields{
2445 "classifierinfo_inport": classifierInfo[InPort],
2446 "action_output": actionInfo[Output]})
2447 portNo, intfID, onuID, uniID := ExtractAccessFromFlow(classifierInfo[InPort].(uint32), actionInfo[Output].(uint32))
2448
2449 if ethType, ok := classifierInfo[EthType]; ok {
2450 if ethType.(uint32) == LldpEthType {
2451 logger.Info(ctx, "adding-lldp-flow")
2452 return f.addLLDPFlow(ctx, flow, portNo)
2453 }
2454 }
2455 if ipProto, ok := classifierInfo[IPProto]; ok {
2456 if ipProto.(uint32) == IPProtoDhcp {
2457 if udpSrc, ok := classifierInfo[UDPSrc]; ok {
2458 if udpSrc.(uint32) == uint32(67) || udpSrc.(uint32) == uint32(546) {
2459 logger.Debug(ctx, "trap-dhcp-from-nni-flow")
2460 return f.addDHCPTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
2461 }
2462 }
2463 }
2464 }
2465 if isIgmpTrapDownstreamFlow(classifierInfo) {
2466 logger.Debug(ctx, "trap-igmp-from-nni-flow")
2467 return f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
2468 }
2469
2470 f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
2471
2472 TpID, err := getTpIDFromFlow(ctx, flow)
2473 if err != nil {
2474 return olterrors.NewErrNotFound("tpid-for-flow",
2475 log.Fields{
2476 "flow": flow,
2477 "intf-id": IntfID,
2478 "onu-id": onuID,
2479 "uni-id": uniID}, err)
2480 }
2481 logger.Debugw(ctx, "tpid-for-this-subcriber",
2482 log.Fields{
2483 "tp-id": TpID,
2484 "intf-id": intfID,
2485 "onu-id": onuID,
2486 "uni-id": uniID})
2487 if IsUpstream(actionInfo[Output].(uint32)) {
2488 UsMeterID = flows.GetMeterIdFromFlow(flow)
2489 logger.Debugw(ctx, "upstream-flow-meter-id", log.Fields{"us-meter-id": UsMeterID})
2490 } else {
2491 DsMeterID = flows.GetMeterIdFromFlow(flow)
2492 logger.Debugw(ctx, "downstream-flow-meter-id", log.Fields{"ds-meter-id": DsMeterID})
2493
2494 }
2495
2496 pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
2497 if _, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
2498 logger.Debugw(ctx, "no-pending-flows-found--going-ahead-with-flow-install",
2499 log.Fields{
2500 "intf-id": intfID,
2501 "onu-id": onuID,
2502 "uni-id": uniID})
2503 f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
2504 } else {
2505 pendingFlowDelComplete := make(chan bool)
2506 go f.waitForFlowDeletesToCompleteForOnu(ctx, intfID, onuID, uniID, pendingFlowDelComplete)
2507 select {
2508 case <-pendingFlowDelComplete:
2509 logger.Debugw(ctx, "all-pending-flow-deletes-completed",
2510 log.Fields{
2511 "intf-id": intfID,
2512 "onu-id": onuID,
2513 "uni-id": uniID})
2514 f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
2515
2516 case <-time.After(10 * time.Second):
2517 return olterrors.NewErrTimeout("pending-flow-deletes",
2518 log.Fields{
2519 "intf-id": intfID,
2520 "onu-id": onuID,
2521 "uni-id": uniID}, nil)
2522 }
2523 }
2524
2525 logger.Debugw(ctx, "end-adding-flow",
2526 log.Fields{
2527 "flow": flow,
2528 "flowmetadata": flowMetadata})
2529
2530 return nil
2531}
2532
2533// handleFlowWithGroup adds multicast flow to the device.
2534func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
2535 classifierInfo[PacketTagType] = DoubleTag
2536 logger.Debugw(ctx, "add-multicast-flow", log.Fields{
2537 "classifier-info": classifierInfo,
2538 "actionInfo": actionInfo})
2539
2540 networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
2541 if err != nil {
2542 return olterrors.NewErrNotFound("multicast-in-port", log.Fields{"classifier": classifierInfo}, err)
2543 }
2544 mcastFlowClassificationByEthDst := false
2545
2546 if mcastFlowClassificationByEthDst {
2547 //replace ipDst with ethDst
2548 if ipv4Dst, ok := classifierInfo[Ipv4Dst]; ok &&
2549 flows.IsMulticastIp(ipv4Dst.(uint32)) {
2550 // replace ipv4_dst classifier with eth_dst
2551 multicastMac := flows.ConvertToMulticastMacBytes(ipv4Dst.(uint32))
2552 delete(classifierInfo, Ipv4Dst)
2553 classifierInfo[EthDst] = multicastMac
2554 logger.Debugw(ctx, "multicast-ip-to-mac-conversion-success",
2555 log.Fields{
2556 "ip:": ipv4Dst.(uint32),
2557 "mac:": multicastMac})
2558 }
2559 }
2560 delete(classifierInfo, EthType)
2561
2562 onuID := NoneOnuID
2563 uniID := NoneUniID
2564 gemPortID := NoneGemPortID
2565
2566 flowStoreCookie := getFlowStoreCookie(ctx, classifierInfo, uint32(0))
2567 if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
2568 logger.Infow(ctx, "multicast-flow-exists-not-re-adding", log.Fields{"classifier-info": classifierInfo})
2569 return nil
2570 }
2571 flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
2572 if err != nil {
2573 return olterrors.NewErrNotFound("multicast-flow-id",
2574 log.Fields{
2575 "interface-id": networkInterfaceID,
2576 "onu-id": onuID,
2577 "uni-id": uniID,
2578 "gem-port-id": gemPortID,
2579 "cookie": flowStoreCookie},
2580 err)
2581 }
2582 classifierProto, err := makeOpenOltClassifierField(classifierInfo)
2583 if err != nil {
2584 return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err)
2585 }
2586 groupID := actionInfo[GroupID].(uint32)
2587 multicastFlow := openoltpb2.Flow{
2588 FlowId: flowID,
2589 FlowType: Multicast,
2590 NetworkIntfId: int32(networkInterfaceID),
2591 GroupId: groupID,
2592 Classifier: classifierProto,
2593 Priority: int32(flow.Priority),
2594 Cookie: flow.Cookie}
2595
2596 if err := f.addFlowToDevice(ctx, flow, &multicastFlow); err != nil {
2597 return olterrors.NewErrFlowOp("add", flowID, log.Fields{"flow": multicastFlow}, err)
2598 }
2599 logger.Info(ctx, "multicast-flow-added-to-device-successfully")
2600 if group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true); err == nil {
2601 //calling groupAdd to set group members after multicast flow creation
2602 if err := f.ModifyGroup(ctx, group); err != nil {
2603 return olterrors.NewErrGroupOp("modify", groupID, log.Fields{"group": group}, err)
2604 }
2605 //cached group can be removed now
2606 if err := f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true); err != nil {
2607 logger.Warnw(ctx, "failed-to-remove-flow-group", log.Fields{"group-id": groupID, "error": err})
2608 }
2609 }
2610
2611 flowsToKVStore := f.getUpdatedFlowInfo(ctx, &multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
2612 if err = f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
2613 int32(onuID),
2614 int32(uniID),
2615 flowID, flowsToKVStore); err != nil {
2616 return olterrors.NewErrPersistence("update", "flow", flowID, log.Fields{"flow": multicastFlow}, err)
2617 }
2618 return nil
2619}
2620
2621//getNNIInterfaceIDOfMulticastFlow returns associated NNI interface id of the inPort criterion if exists; returns the first NNI interface of the device otherwise
2622func (f *OpenOltFlowMgr) getNNIInterfaceIDOfMulticastFlow(ctx context.Context, classifierInfo map[string]interface{}) (uint32, error) {
2623 if inPort, ok := classifierInfo[InPort]; ok {
2624 nniInterfaceID, err := IntfIDFromNniPortNum(ctx, inPort.(uint32))
2625 if err != nil {
2626 return 0, olterrors.NewErrInvalidValue(log.Fields{"nni-in-port-number": inPort}, err)
2627 }
2628 return nniInterfaceID, nil
2629 }
2630 nniPorts, e := f.resourceMgr.GetNNIFromKVStore(ctx)
2631 if e == nil && len(nniPorts) > 0 {
2632 return nniPorts[0], nil
2633 }
2634 return 0, olterrors.NewErrNotFound("nni-port", nil, e).Log()
2635}
2636
2637// AddGroup add or update the group
2638func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
2639 logger.Infow(ctx, "add-group", log.Fields{"group": group})
2640 if group == nil {
2641 return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
2642 }
2643
2644 groupToOlt := openoltpb2.Group{
2645 GroupId: group.Desc.GroupId,
2646 Command: openoltpb2.Group_SET_MEMBERS,
2647 Action: f.buildGroupAction(),
2648 }
2649
2650 logger.Debugw(ctx, "sending-group-to-device", log.Fields{"groupToOlt": groupToOlt})
2651 _, err := f.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
2652 if err != nil {
2653 return olterrors.NewErrAdapter("add-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
2654 }
2655 if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
2656 return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
2657 }
2658 logger.Infow(ctx, "add-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
2659 return nil
2660}
2661
2662// DeleteGroup deletes a group from the device
2663func (f *OpenOltFlowMgr) DeleteGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
2664 logger.Debugw(ctx, "delete-group", log.Fields{"group": group})
2665 if group == nil {
2666 logger.Error(ctx, "unable-to-delete-group--invalid-argument--group-is-nil")
2667 return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
2668 }
2669
2670 groupToOlt := openoltpb2.Group{
2671 GroupId: group.Desc.GroupId,
2672 }
2673
2674 logger.Debugw(ctx, "deleting-group-from-device", log.Fields{"groupToOlt": groupToOlt})
2675 _, err := f.deviceHandler.Client.DeleteGroup(ctx, &groupToOlt)
2676 if err != nil {
2677 logger.Errorw(ctx, "delete-group-failed-on-dev", log.Fields{"groupToOlt": groupToOlt, "err": err})
2678 return olterrors.NewErrAdapter("delete-group-operation-failed", log.Fields{"groupToOlt": groupToOlt}, err)
2679 }
2680 if err := f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, group.Desc.GroupId, false); err != nil {
2681 return olterrors.NewErrPersistence("delete", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
2682 }
2683 logger.Debugw(ctx, "delete-group-operation-performed-on-the-device-successfully ", log.Fields{"groupToOlt": groupToOlt})
2684 return nil
2685}
2686
2687//buildGroupAction creates and returns a group action
2688func (f *OpenOltFlowMgr) buildGroupAction() *openoltpb2.Action {
2689 var actionCmd openoltpb2.ActionCmd
2690 var action openoltpb2.Action
2691 action.Cmd = &actionCmd
2692 action.Cmd.RemoveOuterTag = true
2693 return &action
2694}
2695
2696// ModifyGroup updates the group
2697func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) error {
2698 logger.Infow(ctx, "modify-group", log.Fields{"group": group})
2699 if group == nil || group.Desc == nil {
2700 return olterrors.NewErrInvalidValue(log.Fields{"group": group}, nil)
2701 }
2702
2703 newGroup := f.buildGroup(ctx, group.Desc.GroupId, group.Desc.Buckets)
2704 val, groupExists, err := f.GetFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
2705
2706 if err != nil {
2707 return olterrors.NewErrNotFound("flow-group-in-kv-store", log.Fields{"groupId": group.Desc.GroupId}, err)
2708 }
2709
2710 var current *openoltpb2.Group // represents the group on the device
2711 if groupExists {
2712 // group already exists
2713 current = f.buildGroup(ctx, group.Desc.GroupId, val.Desc.GetBuckets())
2714 logger.Debugw(ctx, "modify-group--group exists",
2715 log.Fields{
2716 "group on the device": val,
2717 "new": group})
2718 } else {
2719 current = f.buildGroup(ctx, group.Desc.GroupId, nil)
2720 }
2721
2722 logger.Debugw(ctx, "modify-group--comparing-current-and-new",
2723 log.Fields{
2724 "group on the device": current,
2725 "new": newGroup})
2726 membersToBeAdded := f.findDiff(current, newGroup)
2727 membersToBeRemoved := f.findDiff(newGroup, current)
2728
2729 logger.Infow(ctx, "modify-group--differences found", log.Fields{
2730 "membersToBeAdded": membersToBeAdded,
2731 "membersToBeRemoved": membersToBeRemoved,
2732 "groupId": group.Desc.GroupId})
2733
2734 groupToOlt := openoltpb2.Group{
2735 GroupId: group.Desc.GroupId,
2736 }
2737 var errAdd, errRemoved error
2738 if len(membersToBeAdded) > 0 {
2739 groupToOlt.Command = openoltpb2.Group_ADD_MEMBERS
2740 groupToOlt.Members = membersToBeAdded
2741 //execute addMembers
2742 errAdd = f.callGroupAddRemove(ctx, &groupToOlt)
2743 }
2744 if len(membersToBeRemoved) > 0 {
2745 groupToOlt.Command = openoltpb2.Group_REMOVE_MEMBERS
2746 groupToOlt.Members = membersToBeRemoved
2747 //execute removeMembers
2748 errRemoved = f.callGroupAddRemove(ctx, &groupToOlt)
2749 }
2750
2751 if errAdd == nil && errRemoved == nil {
2752 if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
2753 return olterrors.NewErrPersistence("add", "flow-group", group.Desc.GroupId, log.Fields{"group": group}, err)
2754 }
2755 logger.Infow(ctx, "modify-group-was-success--storing-group",
2756 log.Fields{
2757 "group": group,
2758 "existingGroup": current})
2759 } else {
2760 logger.Warnw(ctx, "one-of-the-group-add/remove-operations-failed--cannot-save-group-modifications",
2761 log.Fields{"group": group})
2762 if errAdd != nil {
2763 return errAdd
2764 }
2765 return errRemoved
2766 }
2767 return nil
2768}
2769
2770//callGroupAddRemove performs add/remove buckets operation for the indicated group
2771func (f *OpenOltFlowMgr) callGroupAddRemove(ctx context.Context, group *openoltpb2.Group) error {
2772 if err := f.performGroupOperation(ctx, group); err != nil {
2773 st, _ := status.FromError(err)
2774 //ignore already exists error code
2775 if st.Code() != codes.AlreadyExists {
2776 return olterrors.NewErrGroupOp("groupAddRemove", group.GroupId, log.Fields{"status": st}, err)
2777 }
2778 }
2779 return nil
2780}
2781
2782//findDiff compares group members and finds members which only exists in groups2
2783func (f *OpenOltFlowMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
2784 var members []*openoltpb2.GroupMember
2785 for _, bucket := range group2.Members {
2786 if !f.contains(group1.Members, bucket) {
2787 // bucket does not exist and must be added
2788 members = append(members, bucket)
2789 }
2790 }
2791 return members
2792}
2793
2794//contains returns true if the members list contains the given member; false otherwise
2795func (f *OpenOltFlowMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
2796 for _, groupMember := range members {
2797 if groupMember.InterfaceId == member.InterfaceId {
2798 return true
2799 }
2800 }
2801 return false
2802}
2803
2804//performGroupOperation call performGroupOperation operation of openolt proto
2805func (f *OpenOltFlowMgr) performGroupOperation(ctx context.Context, group *openoltpb2.Group) error {
2806 logger.Debugw(ctx, "sending-group-to-device",
2807 log.Fields{
2808 "groupToOlt": group,
2809 "command": group.Command})
2810 _, err := f.deviceHandler.Client.PerformGroupOperation(log.WithSpanFromContext(context.Background(), ctx), group)
2811 if err != nil {
2812 return olterrors.NewErrAdapter("group-operation-failed", log.Fields{"groupToOlt": group}, err)
2813 }
2814 return nil
2815}
2816
2817//buildGroup build openoltpb2.Group from given group id and bucket list
2818func (f *OpenOltFlowMgr) buildGroup(ctx context.Context, groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
2819 group := openoltpb2.Group{
2820 GroupId: groupID}
2821 for _, ofBucket := range buckets {
2822 member := f.buildMember(ctx, ofBucket)
2823 if member != nil && !f.contains(group.Members, member) {
2824 group.Members = append(group.Members, member)
2825 }
2826 }
2827 return &group
2828}
2829
2830//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
2831func (f *OpenOltFlowMgr) buildMember(ctx context.Context, ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
2832 var outPort uint32
2833 outPortFound := false
2834 for _, ofAction := range ofBucket.Actions {
2835 if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
2836 outPort = ofAction.GetOutput().Port
2837 outPortFound = true
2838 }
2839 }
2840
2841 if !outPortFound {
2842 logger.Debugw(ctx, "bucket-skipped-since-no-out-port-found-in-it", log.Fields{"ofBucket": ofBucket})
2843 return nil
2844 }
2845 interfaceID := IntfIDFromUniPortNum(outPort)
2846 logger.Debugw(ctx, "got-associated-interface-id-of-the-port",
2847 log.Fields{
2848 "portNumber:": outPort,
2849 "interfaceId:": interfaceID})
2850 if groupInfo, ok := f.interfaceToMcastQueueMap[interfaceID]; ok {
2851 member := openoltpb2.GroupMember{
2852 InterfaceId: interfaceID,
2853 InterfaceType: openoltpb2.GroupMember_PON,
2854 GemPortId: groupInfo.gemPortID,
2855 Priority: groupInfo.servicePriority,
2856 }
2857 //add member to the group
2858 return &member
2859 }
2860 logger.Warnf(ctx, "bucket-skipped-since-interface-2-gem-mapping-cannot-be-found", log.Fields{"ofBucket": ofBucket})
2861 return nil
2862}
2863
2864//sendTPDownloadMsgToChild send payload
2865func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
2866
2867 onuDev, err := f.getOnuDevice(ctx, intfID, onuID)
2868 if err != nil {
2869 logger.Errorw(ctx, "couldnt-find-onu-child-device",
2870 log.Fields{
2871 "intf-id": intfID,
2872 "onu-id": onuID,
2873 "uni-id": uniID})
2874 return err
2875 }
2876 logger.Debugw(ctx, "got-child-device-from-olt-device-handler", log.Fields{"onu-id": onuDev.deviceID})
2877
2878 tpPath := f.getTPpath(ctx, intfID, uni, TpID)
2879 tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID, Path: tpPath}
2880 logger.Debugw(ctx, "sending-load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"tpDownloadMsg": *tpDownloadMsg})
2881 sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
2882 tpDownloadMsg,
2883 ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST,
2884 f.deviceHandler.device.Type,
2885 onuDev.deviceType,
2886 onuDev.deviceID,
2887 onuDev.proxyDeviceID, "")
2888 if sendErr != nil {
2889 return olterrors.NewErrCommunication("send-techprofile-download-request",
2890 log.Fields{
2891 "from-adapter": f.deviceHandler.device.Type,
2892 "to-adapter": onuDev.deviceType,
2893 "onu-id": onuDev.deviceID,
2894 "proxyDeviceID": onuDev.proxyDeviceID}, sendErr)
2895 }
2896 logger.Infow(ctx, "success-sending-load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"tpDownloadMsg": *tpDownloadMsg})
2897 return nil
2898}
2899
2900//UpdateOnuInfo function adds onu info to cache and kvstore
2901func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
2902
2903 f.onuGemInfoLock.Lock()
2904 defer f.onuGemInfoLock.Unlock()
2905
2906 onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
2907 f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
2908 if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onu); err != nil {
2909 return err
2910 }
2911 logger.Infow(ctx, "updated-onuinfo",
2912 log.Fields{
2913 "intf-id": intfID,
2914 "onu-id": onuID,
2915 "serial-num": serialNum,
2916 "onu": onu,
2917 "device-id": f.deviceHandler.device.Id})
2918 return nil
2919}
2920
2921//addGemPortToOnuInfoMap function adds GEMport to ONU map
2922func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
2923
2924 f.onuGemInfoLock.Lock()
2925 defer f.onuGemInfoLock.Unlock()
2926
2927 logger.Infow(ctx, "adding-gem-to-onu-info-map",
2928 log.Fields{
2929 "gem-port-id": gemPort,
2930 "intf-id": intfID,
2931 "onu-id": onuID,
2932 "device-id": f.deviceHandler.device.Id,
2933 "onu-gem": f.onuGemInfo[intfID]})
2934 onugem := f.onuGemInfo[intfID]
2935 for idx, onu := range onugem {
2936 if onu.OnuID == onuID {
2937 // check if gem already exists , else update the cache and kvstore
2938 for _, gem := range onu.GemPorts {
2939 if gem == gemPort {
2940 logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
2941 log.Fields{
2942 "gem": gemPort,
2943 "device-id": f.deviceHandler.device.Id})
2944 return
2945 }
2946 }
2947 onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
2948 f.onuGemInfo[intfID] = onugem
2949 }
2950 }
2951 err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
2952 if err != nil {
2953 logger.Errorw(ctx, "failed-to-add-gem-to-onu",
2954 log.Fields{
2955 "intf-id": intfID,
2956 "onu-id": onuID,
2957 "gemPort": gemPort,
2958 "device-id": f.deviceHandler.device.Id})
2959 return
2960 }
2961 logger.Infow(ctx, "gem-added-to-onu-info-map",
2962 log.Fields{
2963 "gem-port-id": gemPort,
2964 "intf-id": intfID,
2965 "onu-id": onuID,
2966 "device-id": f.deviceHandler.device.Id,
2967 "onu-gem": f.onuGemInfo[intfID]})
2968}
2969
2970// This function Lookup maps by serialNumber or (intfId, gemPort)
2971
2972//getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
2973func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, error) {
2974
2975 f.onuGemInfoLock.RLock()
2976 defer f.onuGemInfoLock.RUnlock()
2977
2978 logger.Infow(ctx, "getting-onu-id-from-gem-port-and-pon-port",
2979 log.Fields{
2980 "device-id": f.deviceHandler.device.Id,
2981 "onu-geminfo": f.onuGemInfo[intfID],
2982 "intf-id": intfID,
2983 "gemport-id": gemPortID})
2984
2985 onugem := f.onuGemInfo[intfID]
2986
2987 for _, onu := range onugem {
2988 for _, gem := range onu.GemPorts {
2989 if gem == gemPortID {
2990 return onu.OnuID, nil
2991 }
2992 }
2993 }
2994 logger.Errorw(ctx, "onu-id-from-gem-port-not-found", log.Fields{
2995 "gem-port-id": gemPortID,
2996 "interface-id": intfID,
2997 "all-gems-on-port": onugem,
2998 })
2999 return uint32(0), olterrors.NewErrNotFound("onu-id", log.Fields{
3000 "interface-id": intfID,
3001 "gem-port-id": gemPortID},
3002 nil)
3003}
3004
3005//GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
3006func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(ctx context.Context, packetIn *openoltpb2.PacketIndication) (uint32, error) {
3007 var logicalPortNum uint32
3008 var onuID uint32
3009 var err error
3010
3011 if packetIn.IntfType == "pon" {
3012 // packet indication does not have serial number , so sending as nil
3013 if onuID, err = f.getOnuIDfromGemPortMap(ctx, packetIn.IntfId, packetIn.GemportId); err != nil {
3014 // Called method is returning error with all data populated; just return the same
3015 return logicalPortNum, err
3016 }
3017 if packetIn.PortNo != 0 {
3018 logicalPortNum = packetIn.PortNo
3019 } else {
3020 uniID := uint32(0) // FIXME - multi-uni support
3021 logicalPortNum = MkUniPortNum(ctx, packetIn.IntfId, onuID, uniID)
3022 }
3023 // Store the gem port through which the packet_in came. Use the same gem port for packet_out
3024 f.UpdateGemPortForPktIn(ctx, packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId, packetIn.Pkt)
3025 } else if packetIn.IntfType == "nni" {
3026 logicalPortNum = IntfIDToPortNo(packetIn.IntfId, voltha.Port_ETHERNET_NNI)
3027 }
3028 logger.Infow(ctx, "retrieved-logicalport-from-packet-in",
3029 log.Fields{
3030 "logical-port-num": logicalPortNum,
3031 "intf-type": packetIn.IntfType,
3032 "packet": hex.EncodeToString(packetIn.Pkt),
3033 })
3034 return logicalPortNum, nil
3035}
3036
3037//GetPacketOutGemPortID returns gemPortId
3038func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32, packet []byte) (uint32, error) {
3039 var gemPortID uint32
3040
3041 ctag, priority, err := getCTagFromPacket(ctx, packet)
3042 if err != nil {
3043 return 0, err
3044 }
3045
3046 f.onuGemInfoLock.RLock()
3047 defer f.onuGemInfoLock.RUnlock()
3048 pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum, VlanID: ctag, Priority: priority}
3049 var ok bool
3050 gemPortID, ok = f.packetInGemPort[pktInkey]
3051 if ok {
3052 logger.Debugw(ctx, "found-gemport-for-pktin-key",
3053 log.Fields{
3054 "pktinkey": pktInkey,
3055 "gem": gemPortID})
3056
3057 return gemPortID, nil
3058 }
3059 gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(ctx, pktInkey)
3060 if err == nil {
3061 if gemPortID != 0 {
3062 f.packetInGemPort[pktInkey] = gemPortID
3063 logger.Infow(ctx, "found-gem-port-from-kv-store-and-updating-cache-with-gemport",
3064 log.Fields{
3065 "pktinkey": pktInkey,
3066 "gem": gemPortID})
3067 return gemPortID, nil
3068 }
3069 }
3070 return uint32(0), olterrors.NewErrNotFound("gem-port",
3071 log.Fields{
3072 "pktinkey": pktInkey,
3073 "gem": gemPortID}, err)
3074
3075}
3076
3077// nolint: gocyclo
3078func installFlowOnAllGemports(ctx context.Context,
3079 f1 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32,
3080 portNo uint32, classifier map[string]interface{}, action map[string]interface{},
3081 logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, tpID uint32) error,
3082 f2 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32, portNo uint32,
3083 classifier map[string]interface{}, action map[string]interface{},
3084 logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32,
3085 tpID uint32) error,
3086 args map[string]uint32,
3087 classifier map[string]interface{}, action map[string]interface{},
3088 logicalFlow *ofp.OfpFlowStats,
3089 gemPorts []uint32,
3090 TpInst interface{},
3091 FlowType string,
3092 direction string,
3093 tpID uint32,
3094 vlanID ...uint32) {
3095 logger.Debugw(ctx, "installing-flow-on-all-gem-ports",
3096 log.Fields{
3097 "FlowType": FlowType,
3098 "gemPorts": gemPorts,
3099 "vlan": vlanID})
3100
3101
3102
3103 switch TpInst := TpInst.(type) {
3104 case *tp.TechProfile:
3105 attributes := TpInst.DownstreamGemPortAttributeList
3106 if direction == Upstream {
3107 attributes = TpInst.UpstreamGemPortAttributeList
3108 }
3109
3110 for _, gemPortAttribute := range attributes {
3111 if direction == Downstream && strings.ToUpper(gemPortAttribute.IsMulticast) == "TRUE" {
3112 continue
3113 }
3114 gemPortID := gemPortAttribute.GemportID
3115 if allPbitsMarked(gemPortAttribute.PbitMap) {
3116 classifier[VlanPcp] = uint32(VlanPCPMask)
3117 if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
3118 if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
3119 logger.Warn(ctx, err)
3120 }
3121 } else if FlowType == EapolFlow {
3122 if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
3123 logger.Warn(ctx, err)
3124 }
3125 }
3126 } else {
3127 for pos, pbitSet := range strings.TrimPrefix(gemPortAttribute.PbitMap, BinaryStringPrefix) {
3128 if pbitSet == BinaryBit1 {
3129 classifier[VlanPcp] = uint32(len(strings.TrimPrefix(gemPortAttribute.PbitMap, BinaryStringPrefix))) - 1 - uint32(pos)
3130 if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
3131 if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
3132 logger.Warn(ctx, err)
3133 }
3134 } else if FlowType == EapolFlow {
3135 if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
3136 logger.Warn(ctx, err)
3137 }
3138 }
3139 }
3140 }
3141 }
3142 }
3143 case *tp.EponProfile:
3144 if direction == Upstream {
3145 attributes := TpInst.UpstreamQueueAttributeList
3146 for _, queueAttribute := range attributes {
3147 gemPortID := queueAttribute.GemportID
3148 if allPbitsMarked(queueAttribute.PbitMap) {
3149 classifier[VlanPcp] = uint32(VlanPCPMask)
3150 if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
3151 if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
3152 logger.Warn(ctx, err)
3153 }
3154 } else if FlowType == EapolFlow {
3155 if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
3156 logger.Warn(ctx, err)
3157 }
3158 }
3159 } else {
3160 for pos, pbitSet := range strings.TrimPrefix(queueAttribute.PbitMap, BinaryStringPrefix) {
3161 if pbitSet == BinaryBit1 {
3162 classifier[VlanPcp] = uint32(len(strings.TrimPrefix(queueAttribute.PbitMap, BinaryStringPrefix))) - 1 - uint32(pos)
3163 if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
3164 if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
3165 logger.Warn(ctx, err)
3166 }
3167 } else if FlowType == EapolFlow {
3168 if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
3169 logger.Warn(ctx, err)
3170 }
3171 }
3172 }
3173 }
3174 }
3175 }
3176 } else {
3177 attributes := TpInst.DownstreamQueueAttributeList
3178 for _, queueAttribute := range attributes {
3179 gemPortID := queueAttribute.GemportID
3180 if allPbitsMarked(queueAttribute.PbitMap) {
3181 classifier[VlanPcp] = uint32(VlanPCPMask)
3182 if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
3183 if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
3184 logger.Warn(ctx, err)
3185 }
3186 } else if FlowType == EapolFlow {
3187 if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
3188 logger.Warn(ctx, err)
3189 }
3190 }
3191 } else {
3192 for pos, pbitSet := range strings.TrimPrefix(queueAttribute.PbitMap, BinaryStringPrefix) {
3193 if pbitSet == BinaryBit1 {
3194 classifier[VlanPcp] = uint32(len(strings.TrimPrefix(queueAttribute.PbitMap, BinaryStringPrefix))) - 1 - uint32(pos)
3195 if FlowType == DhcpFlow || FlowType == IgmpFlow || FlowType == HsiaFlow {
3196 if err := f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, tpID); err != nil {
3197 logger.Warn(ctx, err)
3198 }
3199 } else if FlowType == EapolFlow {
3200 if err := f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID, vlanID[0], tpID); err != nil {
3201 logger.Warn(ctx, err)
3202 }
3203 }
3204 }
3205 }
3206 }
3207 }
3208 }
3209 default:
3210 logger.Errorw(ctx, "unknown-tech", log.Fields{"tpInst": TpInst})
3211 }
3212}
3213
3214func allPbitsMarked(pbitMap string) bool {
3215 for pos, pBit := range pbitMap {
3216 if pos >= 2 && pBit != BinaryBit1 {
3217 return false
3218 }
3219 }
3220 return true
3221}
3222
3223func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) error {
3224 logger.Debug(ctx, "adding-trap-dhcp-of-nni-flow")
3225 action := make(map[string]interface{})
3226 classifier[PacketTagType] = DoubleTag
3227 action[TrapToHost] = true
3228 /* We manage flowId resource pool on per PON port basis.
3229 Since this situation is tricky, as a hack, we pass the NNI port
3230 index (network_intf_id) as PON port Index for the flowId resource
3231 pool. Also, there is no ONU Id available for trapping DHCP packets
3232 on NNI port, use onu_id as -1 (invalid)
3233 ****************** CAVEAT *******************
3234 This logic works if the NNI Port Id falls within the same valid
3235 range of PON Port Ids. If this doesn't work for some OLT Vendor
3236 we need to have a re-look at this.
3237 *********************************************
3238 */
3239 onuID := -1
3240 uniID := -1
3241 gemPortID := -1
3242 allocID := -1
3243 networkInterfaceID, err := getNniIntfID(ctx, classifier, action)
3244 if err != nil {
3245 return olterrors.NewErrNotFound("nni-intreface-id",
3246 log.Fields{
3247 "classifier": classifier,
3248 "action": action},
3249 err)
3250 }
3251
3252 flowStoreCookie := getFlowStoreCookie(ctx, classifier, uint32(0))
3253 if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
3254 logger.Info(ctx, "flow-exists-not-re-adding")
3255 return nil
3256 }
3257 flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
3258 if err != nil {
3259 return olterrors.NewErrNotFound("dhcp-trap-nni-flow-id",
3260 log.Fields{
3261 "interface-id": networkInterfaceID,
3262 "onu-id": onuID,
3263 "uni-id": uniID,
3264 "gem-port-id": gemPortID,
3265 "cookie": flowStoreCookie},
3266 err)
3267 }
3268 classifierProto, err := makeOpenOltClassifierField(classifier)
3269 if err != nil {
3270 return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier}, err)
3271 }
3272 logger.Debugw(ctx, "created-classifier-proto", log.Fields{"classifier": *classifierProto})
3273 actionProto, err := makeOpenOltActionField(action, classifier)
3274 if err != nil {
3275 return olterrors.NewErrInvalidValue(log.Fields{"action": action}, err)
3276 }
3277 logger.Debugw(ctx, "created-action-proto", log.Fields{"action": *actionProto})
3278 downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
3279 OnuId: int32(onuID), // OnuId not required
3280 UniId: int32(uniID), // UniId not used
3281 FlowId: flowID,
3282 FlowType: Downstream,
3283 AllocId: int32(allocID), // AllocId not used
3284 NetworkIntfId: int32(networkInterfaceID),
3285 GemportId: int32(gemPortID), // GemportId not used
3286 Classifier: classifierProto,
3287 Action: actionProto,
3288 Priority: int32(logicalFlow.Priority),
3289 Cookie: logicalFlow.Cookie,
3290 PortNo: portNo}
3291 if err := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); err != nil {
3292 return olterrors.NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err)
3293 }
3294 logger.Info(ctx, "dhcp-trap-on-nni-flow-added–to-device-successfully")
3295 flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
3296 if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
3297 int32(onuID),
3298 int32(uniID),
3299 flowID, flowsToKVStore); err != nil {
3300 return olterrors.NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err)
3301 }
3302 return nil
3303}
3304
3305//getPacketTypeFromClassifiers finds and returns packet type of a flow by checking flow classifiers
3306func getPacketTypeFromClassifiers(classifierInfo map[string]interface{}) string {
3307 var packetType string
3308 ovid, ivid := false, false
3309 if vlanID, ok := classifierInfo[VlanVid].(uint32); ok {
3310 vid := vlanID & VlanvIDMask
3311 if vid != ReservedVlan {
3312 ovid = true
3313 }
3314 }
3315 if metadata, ok := classifierInfo[Metadata].(uint64); ok {
3316 vid := uint32(metadata)
3317 if vid != ReservedVlan {
3318 ivid = true
3319 }
3320 }
3321 if ovid && ivid {
3322 packetType = DoubleTag
3323 } else if !ovid && !ivid {
3324 packetType = Untagged
3325 } else {
3326 packetType = SingleTag
3327 }
3328 return packetType
3329}
3330
3331//addIgmpTrapFlowOnNNI adds a trap-to-host flow on NNI
3332func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) error {
3333 logger.Infow(ctx, "adding-igmp-trap-of-nni-flow", log.Fields{"classifier-info": classifier})
3334 action := make(map[string]interface{})
3335 classifier[PacketTagType] = getPacketTypeFromClassifiers(classifier)
3336 action[TrapToHost] = true
3337 /* We manage flowId resource pool on per PON port basis.
3338 Since this situation is tricky, as a hack, we pass the NNI port
3339 index (network_intf_id) as PON port Index for the flowId resource
3340 pool. Also, there is no ONU Id available for trapping packets
3341 on NNI port, use onu_id as -1 (invalid)
3342 ****************** CAVEAT *******************
3343 This logic works if the NNI Port Id falls within the same valid
3344 range of PON Port Ids. If this doesn't work for some OLT Vendor
3345 we need to have a re-look at this.
3346 *********************************************
3347 */
3348 onuID := -1
3349 uniID := -1
3350 gemPortID := -1
3351 allocID := -1
3352 networkInterfaceID, err := getNniIntfID(ctx, classifier, action)
3353 if err != nil {
3354 return olterrors.NewErrNotFound("nni-interface-id", log.Fields{
3355 "classifier": classifier,
3356 "action": action},
3357 err)
3358 }
3359 flowStoreCookie := getFlowStoreCookie(ctx, classifier, uint32(0))
3360 if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
3361 logger.Info(ctx, "igmp-flow-exists-not-re-adding")
3362 return nil
3363 }
3364 flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
3365 if err != nil {
3366 return olterrors.NewErrNotFound("igmp-flow-id",
3367 log.Fields{
3368 "interface-id": networkInterfaceID,
3369 "onu-id": onuID,
3370 "uni-id": uniID,
3371 "gem-port-id": gemPortID,
3372 "cookie": flowStoreCookie},
3373 err)
3374 }
3375 classifierProto, err := makeOpenOltClassifierField(classifier)
3376 if err != nil {
3377 return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier}, err)
3378 }
3379 logger.Debugw(ctx, "created-classifier-proto-for-the-igmp-flow", log.Fields{"classifier": *classifierProto})
3380 actionProto, err := makeOpenOltActionField(action, classifier)
3381 if err != nil {
3382 return olterrors.NewErrInvalidValue(log.Fields{"action": action}, err)
3383 }
3384 logger.Debugw(ctx, "created-action-proto-for-the-igmp-flow", log.Fields{"action": *actionProto})
3385 downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
3386 OnuId: int32(onuID), // OnuId not required
3387 UniId: int32(uniID), // UniId not used
3388 FlowId: flowID,
3389 FlowType: Downstream,
3390 AllocId: int32(allocID), // AllocId not used
3391 NetworkIntfId: int32(networkInterfaceID),
3392 GemportId: int32(gemPortID), // GemportId not used
3393 Classifier: classifierProto,
3394 Action: actionProto,
3395 Priority: int32(logicalFlow.Priority),
3396 Cookie: logicalFlow.Cookie,
3397 PortNo: portNo}
3398 if err := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); err != nil {
3399 return olterrors.NewErrFlowOp("add", flowID, log.Fields{"flow": downstreamflow}, err)
3400 }
3401 logger.Info(ctx, "igmp-trap-on-nni-flow-added-to-device-successfully")
3402 flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
3403 if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
3404 int32(onuID),
3405 int32(uniID),
3406 flowID, flowsToKVStore); err != nil {
3407 return olterrors.NewErrPersistence("update", "flow", flowID, log.Fields{"flow": downstreamflow}, err)
3408 }
3409 return nil
3410}
3411
3412func verifyMeterIDAndGetDirection(MeterID uint32, Dir tp_pb.Direction) (string, error) {
3413 if MeterID == 0 { // This should never happen
3414 return "", olterrors.NewErrInvalidValue(log.Fields{"meter-id": MeterID}, nil).Log()
3415 }
3416 if Dir == tp_pb.Direction_UPSTREAM {
3417 return "upstream", nil
3418 } else if Dir == tp_pb.Direction_DOWNSTREAM {
3419 return "downstream", nil
3420 }
3421 return "", nil
3422}
3423
3424// nolint: gocyclo
3425func (f *OpenOltFlowMgr) checkAndAddFlow(ctx context.Context, args map[string]uint32, classifierInfo map[string]interface{},
3426 actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpInst interface{}, gemPorts []uint32,
3427 tpID uint32, uni string) {
3428 var gemPort uint32
3429 intfID := args[IntfID]
3430 onuID := args[OnuID]
3431 uniID := args[UniID]
3432 portNo := args[PortNo]
3433 allocID := args[AllocID]
3434 if ipProto, ok := classifierInfo[IPProto]; ok {
3435 if ipProto.(uint32) == IPProtoDhcp {
3436 logger.Infow(ctx, "adding-dhcp-flow", log.Fields{
3437 "tp-id": tpID,
3438 "alloc-id": allocID,
3439 "intf-id": intfID,
3440 "onu-id": onuID,
3441 "uni-id": uniID,
3442 })
3443 if pcp, ok := classifierInfo[VlanPcp]; ok {
3444 gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
3445 tp_pb.Direction_UPSTREAM,
3446 pcp.(uint32))
3447 //Adding DHCP upstream flow
3448
3449 if err := f.addDHCPTrapFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID); err != nil {
3450 logger.Warn(ctx, err)
3451 }
3452 } else {
3453 //Adding DHCP upstream flow to all gemports
3454 installFlowOnAllGemports(ctx, f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, TpInst, DhcpFlow, Upstream, tpID)
3455 }
3456
3457 } else if ipProto.(uint32) == IgmpProto {
3458 logger.Infow(ctx, "adding-us-igmp-flow",
3459 log.Fields{
3460 "intf-id": intfID,
3461 "onu-id": onuID,
3462 "uni-id": uniID,
3463 "classifier-info:": classifierInfo})
3464 if pcp, ok := classifierInfo[VlanPcp]; ok {
3465 gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
3466 tp_pb.Direction_UPSTREAM,
3467 pcp.(uint32))
3468 if err := f.addIGMPTrapFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID); err != nil {
3469 logger.Warn(ctx, err)
3470 }
3471 } else {
3472 //Adding IGMP upstream flow to all gem ports
3473 installFlowOnAllGemports(ctx, f.addIGMPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, TpInst, IgmpFlow, Upstream, tpID)
3474 }
3475 } else {
3476 logger.Errorw(ctx, "invalid-classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
3477 return
3478 }
3479 } else if ethType, ok := classifierInfo[EthType]; ok {
3480 if ethType.(uint32) == EapEthType {
3481 logger.Infow(ctx, "adding-eapol-flow", log.Fields{
3482 "intf-id": intfID,
3483 "onu-id": onuID,
3484 "uni-id": uniID,
3485 })
3486 var vlanID uint32
3487 if val, ok := classifierInfo[VlanVid]; ok {
3488 vlanID = (val.(uint32)) & VlanvIDMask
3489 } else {
3490 vlanID = DefaultMgmtVlan
3491 }
3492 if pcp, ok := classifierInfo[VlanPcp]; ok {
3493 gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
3494 tp_pb.Direction_UPSTREAM,
3495 pcp.(uint32))
3496
3497 if err := f.addEAPOLFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, vlanID, tpID); err != nil {
3498 logger.Warn(ctx, err)
3499 }
3500 } else {
3501 installFlowOnAllGemports(ctx, nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, TpInst, EapolFlow, Upstream, tpID, vlanID)
3502 }
3503 }
3504 } else if _, ok := actionInfo[PushVlan]; ok {
3505 logger.Infow(ctx, "adding-upstream-data-rule", log.Fields{
3506 "intf-id": intfID,
3507 "onu-id": onuID,
3508 "uni-id": uniID,
3509 })
3510 if pcp, ok := classifierInfo[VlanPcp]; ok {
3511 gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
3512 tp_pb.Direction_UPSTREAM,
3513 pcp.(uint32))
3514 //Adding HSIA upstream flow
3515 if err := f.addUpstreamDataFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID); err != nil {
3516 logger.Warn(ctx, err)
3517 }
3518 } else {
3519 //Adding HSIA upstream flow to all gemports
3520 installFlowOnAllGemports(ctx, f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, TpInst, HsiaFlow, Upstream, tpID)
3521 }
3522 } else if _, ok := actionInfo[PopVlan]; ok {
3523 logger.Infow(ctx, "adding-downstream-data-rule", log.Fields{
3524 "intf-id": intfID,
3525 "onu-id": onuID,
3526 "uni-id": uniID,
3527 })
3528 if pcp, ok := classifierInfo[VlanPcp]; ok {
3529 gemPort = f.techprofile[intfID].GetGemportIDForPbit(ctx, TpInst,
3530 tp_pb.Direction_DOWNSTREAM,
3531 pcp.(uint32))
3532 //Adding HSIA downstream flow
3533 if err := f.addDownstreamDataFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort, tpID); err != nil {
3534 logger.Warn(ctx, err)
3535 }
3536 } else {
3537 //Adding HSIA downstream flow to all gemports
3538 installFlowOnAllGemports(ctx, f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, TpInst, HsiaFlow, Downstream, tpID)
3539 }
3540 } else {
3541 logger.Errorw(ctx, "invalid-flow-type-to-handle",
3542 log.Fields{
3543 "intf-id": intfID,
3544 "onu-id": onuID,
3545 "uni-id": uniID,
3546 "classifier": classifierInfo,
3547 "action": actionInfo,
3548 "flow": flow})
3549 return
3550 }
3551 if noticeDownloadRequest {
3552 go func() {
3553 if err := f.sendTPDownloadMsgToChild(ctx, intfID, onuID, uniID, uni, tpID); err != nil {
3554 logger.Warn(ctx, err)
3555 }
3556 }()
3557 }
3558}
3559
3560func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(ctx context.Context, gemPK gemPortKey) (bool, error) {
3561 if f.perGemPortLock.TryLock(gemPK) {
3562 flowIDList := f.flowsUsedByGemPort[gemPK]
3563 f.perGemPortLock.Unlock(gemPK)
3564 return len(flowIDList) > 1, nil
3565 }
3566 logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
3567 log.Fields{
3568 "device-id": f.deviceHandler.device.Id,
3569 "key": gemPK,
3570 })
3571 return false, olterrors.NewErrAdapter("failed-to-acquire-per-gem-port-lock", log.Fields{
3572 "device-id": f.deviceHandler.device.Id,
3573 "key": gemPK,
3574 }, nil)
3575}
3576
3577func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
3578 currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, ponIntf, onuID, uniID)
3579 tpGemPorts := tpInst.UpstreamGemPortAttributeList
3580 for _, currentGemPort := range currentGemPorts {
3581 for _, tpGemPort := range tpGemPorts {
3582 if (currentGemPort == tpGemPort.GemportID) && (currentGemPort != gemPortID) {
3583 return true, currentGemPort
3584 }
3585 }
3586 }
3587 if tpInst.InstanceCtrl.Onu == "single-instance" {
3588 // The TP information for the given TP ID, PON ID, ONU ID, UNI ID should be removed.
3589 if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, ponIntf, uint32(onuID), uint32(uniID), tpID); err != nil {
3590 logger.Warn(ctx, err)
3591 }
3592 if err := f.DeleteTechProfileInstance(ctx, ponIntf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
3593 logger.Warn(ctx, err)
3594 }
3595
3596 // Although we cleaned up TP Instance for the given (PON ID, ONU ID, UNI ID), the TP might
3597 // still be used on other uni ports.
3598 // So, we need to check and make sure that no other gem port is referring to the given TP ID
3599 // on any other uni port.
3600 tpInstances := f.techprofile[ponIntf].FindAllTpInstances(ctx, tpID, ponIntf, onuID).([]tp.TechProfile)
3601 logger.Debugw(ctx, "got-single-instance-tp-instances", log.Fields{"tp-instances": tpInstances})
3602 for i := 0; i < len(tpInstances); i++ {
3603 tpI := tpInstances[i]
3604 tpGemPorts := tpI.UpstreamGemPortAttributeList
3605 for _, tpGemPort := range tpGemPorts {
3606 if tpGemPort.GemportID != gemPortID {
3607 logger.Debugw(ctx, "single-instance-tp-is-in-use-by-gem", log.Fields{"gemPort": tpGemPort.GemportID})
3608 return true, tpGemPort.GemportID
3609 }
3610 }
3611 }
3612 }
3613 logger.Debug(ctx, "tech-profile-is-not-in-use-by-any-gem")
3614 return false, 0
3615}
3616
3617func formulateClassifierInfoFromFlow(ctx context.Context, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
3618 for _, field := range flows.GetOfbFields(flow) {
3619 if field.Type == flows.ETH_TYPE {
3620 classifierInfo[EthType] = field.GetEthType()
3621 logger.Debug(ctx, "field-type-eth-type", log.Fields{"classifierInfo[ETH_TYPE]": classifierInfo[EthType].(uint32)})
3622 } else if field.Type == flows.ETH_DST {
3623 classifierInfo[EthDst] = field.GetEthDst()
3624 logger.Debug(ctx, "field-type-eth-type", log.Fields{"classifierInfo[ETH_DST]": classifierInfo[EthDst].([]uint8)})
3625 } else if field.Type == flows.IP_PROTO {
3626 classifierInfo[IPProto] = field.GetIpProto()
3627 logger.Debug(ctx, "field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
3628 } else if field.Type == flows.IN_PORT {
3629 classifierInfo[InPort] = field.GetPort()
3630 logger.Debug(ctx, "field-type-in-port", log.Fields{"classifierInfo[IN_PORT]": classifierInfo[InPort].(uint32)})
3631 } else if field.Type == flows.VLAN_VID {
3632 classifierInfo[VlanVid] = field.GetVlanVid() & 0xfff
3633 logger.Debug(ctx, "field-type-vlan-vid", log.Fields{"classifierInfo[VLAN_VID]": classifierInfo[VlanVid].(uint32)})
3634 } else if field.Type == flows.VLAN_PCP {
3635 classifierInfo[VlanPcp] = field.GetVlanPcp()
3636 logger.Debug(ctx, "field-type-vlan-pcp", log.Fields{"classifierInfo[VLAN_PCP]": classifierInfo[VlanPcp].(uint32)})
3637 } else if field.Type == flows.UDP_DST {
3638 classifierInfo[UDPDst] = field.GetUdpDst()
3639 logger.Debug(ctx, "field-type-udp-dst", log.Fields{"classifierInfo[UDP_DST]": classifierInfo[UDPDst].(uint32)})
3640 } else if field.Type == flows.UDP_SRC {
3641 classifierInfo[UDPSrc] = field.GetUdpSrc()
3642 logger.Debug(ctx, "field-type-udp-src", log.Fields{"classifierInfo[UDP_SRC]": classifierInfo[UDPSrc].(uint32)})
3643 } else if field.Type == flows.IPV4_DST {
3644 classifierInfo[Ipv4Dst] = field.GetIpv4Dst()
3645 logger.Debug(ctx, "field-type-ipv4-dst", log.Fields{"classifierInfo[IPV4_DST]": classifierInfo[Ipv4Dst].(uint32)})
3646 } else if field.Type == flows.IPV4_SRC {
3647 classifierInfo[Ipv4Src] = field.GetIpv4Src()
3648 logger.Debug(ctx, "field-type-ipv4-src", log.Fields{"classifierInfo[IPV4_SRC]": classifierInfo[Ipv4Src].(uint32)})
3649 } else if field.Type == flows.METADATA {
3650 classifierInfo[Metadata] = field.GetTableMetadata()
3651 logger.Debug(ctx, "field-type-metadata", log.Fields{"classifierInfo[Metadata]": classifierInfo[Metadata].(uint64)})
3652 } else if field.Type == flows.TUNNEL_ID {
3653 classifierInfo[TunnelID] = field.GetTunnelId()
3654 logger.Debug(ctx, "field-type-tunnelId", log.Fields{"classifierInfo[TUNNEL_ID]": classifierInfo[TunnelID].(uint64)})
3655 } else {
3656 logger.Errorw(ctx, "un-supported-field-type", log.Fields{"type": field.Type})
3657 return
3658 }
3659 }
3660}
3661
3662func formulateActionInfoFromFlow(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
3663 for _, action := range flows.GetActions(flow) {
3664 if action.Type == flows.OUTPUT {
3665 if out := action.GetOutput(); out != nil {
3666 actionInfo[Output] = out.GetPort()
3667 logger.Debugw(ctx, "action-type-output", log.Fields{"out-port": actionInfo[Output].(uint32)})
3668 } else {
3669 return olterrors.NewErrInvalidValue(log.Fields{"output-port": nil}, nil)
3670 }
3671 } else if action.Type == flows.POP_VLAN {
3672 actionInfo[PopVlan] = true
3673 logger.Debugw(ctx, "action-type-pop-vlan", log.Fields{"in_port": classifierInfo[InPort].(uint32)})
3674 } else if action.Type == flows.PUSH_VLAN {
3675 if out := action.GetPush(); out != nil {
3676 if tpid := out.GetEthertype(); tpid != 0x8100 {
3677 logger.Errorw(ctx, "invalid ethertype in push action", log.Fields{"ethertype": actionInfo[PushVlan].(int32)})
3678 } else {
3679 actionInfo[PushVlan] = true
3680 actionInfo[TPID] = tpid
3681 logger.Debugw(ctx, "action-type-push-vlan",
3682 log.Fields{
3683 "push-tpid": actionInfo[TPID].(uint32),
3684 "in-port": classifierInfo[InPort].(uint32)})
3685 }
3686 }
3687 } else if action.Type == flows.SET_FIELD {
3688 if out := action.GetSetField(); out != nil {
3689 if field := out.GetField(); field != nil {
3690 if ofClass := field.GetOxmClass(); ofClass != ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
3691 return olterrors.NewErrInvalidValue(log.Fields{"openflow-class": ofClass}, nil)
3692 }
3693 /*logger.Debugw(ctx, "action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
3694 formulateSetFieldActionInfoFromFlow(ctx, field, actionInfo)
3695 }
3696 }
3697 } else if action.Type == flows.GROUP {
3698 formulateGroupActionInfoFromFlow(ctx, action, actionInfo)
3699 } else {
3700 return olterrors.NewErrInvalidValue(log.Fields{"action-type": action.Type}, nil)
3701 }
3702 }
3703 return nil
3704}
3705
3706func formulateSetFieldActionInfoFromFlow(ctx context.Context, field *ofp.OfpOxmField, actionInfo map[string]interface{}) {
3707 if ofbField := field.GetOfbField(); ofbField != nil {
3708 fieldtype := ofbField.GetType()
3709 if fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
3710 if vlan := ofbField.GetVlanVid(); vlan != 0 {
3711 actionInfo[VlanVid] = vlan & 0xfff
3712 logger.Debugw(ctx, "action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
3713 } else {
3714 logger.Error(ctx, "no-invalid-vlan-id-in-set-vlan-vid-action")
3715 }
3716 } else if fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_PCP {
3717 pcp := ofbField.GetVlanPcp()
3718 actionInfo[VlanPcp] = pcp
3719 logger.Debugw(ctx, "action-set-vlan-pcp", log.Fields{"actionInfo[VLAN_PCP]": actionInfo[VlanPcp].(uint32)})
3720 } else {
3721 logger.Errorw(ctx, "unsupported-action-set-field-type", log.Fields{"type": fieldtype})
3722 }
3723 }
3724}
3725
3726func formulateGroupActionInfoFromFlow(ctx context.Context, action *ofp.OfpAction, actionInfo map[string]interface{}) {
3727 if action.GetGroup() == nil {
3728 logger.Warn(ctx, "no-group-entry-found-in-the-group-action")
3729 } else {
3730 actionInfo[GroupID] = action.GetGroup().GroupId
3731 logger.Debugw(ctx, "action-group-id", log.Fields{"actionInfo[GroupID]": actionInfo[GroupID].(uint32)})
3732 }
3733}
3734
3735func formulateControllerBoundTrapFlowInfo(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
3736 if isControllerFlow := IsControllerBoundFlow(actionInfo[Output].(uint32)); isControllerFlow {
3737 logger.Debug(ctx, "controller-bound-trap-flows--getting-inport-from-tunnelid")
3738 /* Get UNI port/ IN Port from tunnel ID field for upstream controller bound flows */
3739 if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_PON_OLT {
3740 if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
3741 classifierInfo[InPort] = uniPort
3742 logger.Debugw(ctx, "upstream-pon-to-controller-flow--inport-in-tunnelid",
3743 log.Fields{
3744 "newinport": classifierInfo[InPort].(uint32),
3745 "outport": actionInfo[Output].(uint32)})
3746 } else {
3747 return olterrors.NewErrNotFound("child-in-port",
3748 log.Fields{
3749 "reason": "upstream-pon-to-controller-flow--no-inport-in-tunnelid",
3750 "flow": flow}, nil)
3751 }
3752 }
3753 } else {
3754 logger.Debug(ctx, "non-controller-flows--getting-uniport-from-tunnelid")
3755 // Downstream flow from NNI to PON port , Use tunnel ID as new OUT port / UNI port
3756 if portType := IntfIDToPortTypeName(actionInfo[Output].(uint32)); portType == voltha.Port_PON_OLT {
3757 if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
3758 actionInfo[Output] = uniPort
3759 logger.Debugw(ctx, "downstream-nni-to-pon-port-flow, outport-in-tunnelid",
3760 log.Fields{
3761 "newoutport": actionInfo[Output].(uint32),
3762 "outport": actionInfo[Output].(uint32)})
3763 } else {
3764 return olterrors.NewErrNotFound("out-port",
3765 log.Fields{
3766 "reason": "downstream-nni-to-pon-port-flow--no-outport-in-tunnelid",
3767 "flow": flow}, nil)
3768 }
3769 // Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
3770 } else if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_PON_OLT {
3771 if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
3772 classifierInfo[InPort] = uniPort
3773 logger.Debugw(ctx, "upstream-pon-to-nni-port-flow, inport-in-tunnelid",
3774 log.Fields{
3775 "newinport": actionInfo[Output].(uint32),
3776 "outport": actionInfo[Output].(uint32)})
3777 } else {
3778 return olterrors.NewErrNotFound("nni-port",
3779 log.Fields{
3780 "reason": "upstream-pon-to-nni-port-flow--no-inport-in-tunnelid",
3781 "in-port": classifierInfo[InPort].(uint32),
3782 "out-port": actionInfo[Output].(uint32),
3783 "flow": flow}, nil)
3784 }
3785 }
3786 }
3787 return nil
3788}
3789
3790func getTpIDFromFlow(ctx context.Context, flow *ofp.OfpFlowStats) (uint32, error) {
3791 /* Metadata 8 bytes:
3792 Most Significant 2 Bytes = Inner VLAN
3793 Next 2 Bytes = Tech Profile ID(TPID)
3794 Least Significant 4 Bytes = Port ID
3795 Flow Metadata carries Tech-Profile (TP) ID and is mandatory in all
3796 subscriber related flows.
3797 */
3798 metadata := flows.GetMetadataFromWriteMetadataAction(ctx, flow)
3799 if metadata == 0 {
3800 return 0, olterrors.NewErrNotFound("metadata", log.Fields{"flow": flow}, nil)
3801 }
3802 TpID := flows.GetTechProfileIDFromWriteMetaData(ctx, metadata)
3803 return uint32(TpID), nil
3804}
3805
3806func appendUnique(slice []uint32, item uint32) []uint32 {
3807 for _, sliceElement := range slice {
3808 if sliceElement == item {
3809 return slice
3810 }
3811 }
3812 return append(slice, item)
3813}
3814
3815// getNniIntfID gets nni intf id from the flow classifier/action
3816func getNniIntfID(ctx context.Context, classifier map[string]interface{}, action map[string]interface{}) (uint32, error) {
3817
3818 portType := IntfIDToPortTypeName(classifier[InPort].(uint32))
3819 if portType == voltha.Port_PON_OLT {
3820 intfID, err := IntfIDFromNniPortNum(ctx, action[Output].(uint32))
3821 if err != nil {
3822 logger.Debugw(ctx, "invalid-action-port-number",
3823 log.Fields{
3824 "port-number": action[Output].(uint32),
3825 "error": err})
3826 return uint32(0), err
3827 }
3828 logger.Infow(ctx, "output-nni-intfId-is", log.Fields{"intf-id": intfID})
3829 return intfID, nil
3830 } else if portType == voltha.Port_ETHERNET_NNI {
3831 intfID, err := IntfIDFromNniPortNum(ctx, classifier[InPort].(uint32))
3832 if err != nil {
3833 logger.Debugw(ctx, "invalid-classifier-port-number",
3834 log.Fields{
3835 "port-number": action[Output].(uint32),
3836 "error": err})
3837 return uint32(0), err
3838 }
3839 logger.Infow(ctx, "input-nni-intfId-is", log.Fields{"intf-id": intfID})
3840 return intfID, nil
3841 }
3842 return uint32(0), nil
3843}
3844
3845// UpdateGemPortForPktIn updates gemport for packet-in in to the cache and to the kv store as well.
3846func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32, pkt []byte) {
3847 cTag, priority, err := getCTagFromPacket(ctx, pkt)
3848 if err != nil {
3849 logger.Errorw(ctx, "unable-to-update-gem-port-for-packet-in",
3850 log.Fields{"intfID": intfID, "onuID": onuID, "logicalPort": logicalPort, "gemPort": gemPort, "err": err})
3851 return
3852 }
3853 pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort, VlanID: cTag, Priority: priority}
3854
3855 f.onuGemInfoLock.Lock()
3856 defer f.onuGemInfoLock.Unlock()
3857
3858 lookupGemPort, ok := f.packetInGemPort[pktInkey]
3859 if ok {
3860 if lookupGemPort == gemPort {
3861 logger.Infow(ctx, "pktin-key/value-found-in-cache--no-need-to-update-kv--assume-both-in-sync",
3862 log.Fields{
3863 "pktinkey": pktInkey,
3864 "gem": gemPort})
3865 return
3866 }
3867 }
3868 f.packetInGemPort[pktInkey] = gemPort
3869
3870 f.resourceMgr.UpdateGemPortForPktIn(ctx, pktInkey, gemPort)
3871 logger.Infow(ctx, "pktin-key-not-found-in-local-cache-value-is-different--updating-cache-and-kv-store",
3872 log.Fields{
3873 "pktinkey": pktInkey,
3874 "gem": gemPort})
3875}
3876
3877//getCTagFromPacket retrieves and returns c-tag and priority value from a packet.
3878func getCTagFromPacket(ctx context.Context, packet []byte) (uint16, uint8, error) {
3879 if packet == nil || len(packet) < 18 {
3880 logger.Error(ctx, "unable-get-c-tag-from-the-packet--invalid-packet-length ")
3881 return 0, 0, errors.New("invalid packet length")
3882 }
3883 outerEthType := (uint16(packet[12]) << 8) | uint16(packet[13])
3884 innerEthType := (uint16(packet[16]) << 8) | uint16(packet[17])
3885
3886 var index int8
3887 if outerEthType == 0x8100 {
3888 if innerEthType == 0x8100 {
3889 // q-in-q 802.1ad or 802.1q double tagged packet.
3890 // get the inner vlanId
3891 index = 18
3892 } else {
3893 index = 14
3894 }
3895 priority := (packet[index] >> 5) & 0x7
3896 //13 bits composes vlanId value
3897 vlan := ((uint16(packet[index]) << 8) & 0x0fff) | uint16(packet[index+1])
3898 return vlan, priority, nil
3899 }
3900 logger.Debugf(ctx, "No vlanId found in the packet. Returning zero as c-tag")
3901 return 0, 0, nil
3902}
3903
3904// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
3905func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
3906
3907 f.onuGemInfoLock.Lock()
3908 defer f.onuGemInfoLock.Unlock()
3909
3910 onugem := f.onuGemInfo[intfID]
3911 for idx, onu := range onugem {
3912 if onu.OnuID == onuID {
3913 for _, uni := range onu.UniPorts {
3914 if uni == portNum {
3915 logger.Infow(ctx, "uni-already-in-cache--no-need-to-update-cache-and-kv-store", log.Fields{"uni": portNum})
3916 return
3917 }
3918 }
3919 onugem[idx].UniPorts = append(onugem[idx].UniPorts, portNum)
3920 f.onuGemInfo[intfID] = onugem
3921 }
3922 }
3923 f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNum)
3924
3925}
3926
3927func (f *OpenOltFlowMgr) loadFlowIDlistForGem(ctx context.Context, intf uint32) {
3928 flowIDsList, err := f.resourceMgr.GetFlowIDsGemMapForInterface(ctx, intf)
3929 if err != nil {
3930 logger.Error(ctx, "failed-to-get-flowid-list-per-gem", log.Fields{"intf": intf})
3931 return
3932 }
3933 for gem, FlowIDs := range flowIDsList {
3934 gemPK := gemPortKey{intf, uint32(gem)}
3935 if f.perGemPortLock.TryLock(gemPK) {
3936 f.flowsUsedByGemPort[gemPK] = FlowIDs
3937 f.perGemPortLock.Unlock(gemPK)
3938 } else {
3939 logger.Error(ctx, "failed-to-acquire-per-gem-port-lock",
3940 log.Fields{
3941 "intf-id": intf,
3942 "device-id": f.deviceHandler.device.Id,
3943 "key": gemPK,
3944 })
3945 }
3946 }
3947}
3948
3949//loadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
3950//and put them into interfaceToMcastQueueMap.
3951func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap(ctx context.Context) {
3952 storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
3953 if err != nil {
3954 logger.Error(ctx, "failed-to-get-pon-interface-to-multicast-queue-map")
3955 return
3956 }
3957 for intf, queueInfo := range storedMulticastQueueMap {
3958 q := queueInfoBrief{
3959 gemPortID: queueInfo[0],
3960 servicePriority: queueInfo[1],
3961 }
3962 f.interfaceToMcastQueueMap[intf] = &q
3963 }
3964}
3965
3966//GetFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
3967//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
3968//Returns (nil, false, nil) if the group does not exists in the KV store.
3969func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
3970 exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
3971 if err != nil {
3972 return nil, false, olterrors.NewErrNotFound("flow-group", log.Fields{"group-id": groupID}, err)
3973 }
3974 if exists {
3975 return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
3976 }
3977 return nil, exists, nil
3978}
3979
3980func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
3981 groupDesc := ofp.OfpGroupDesc{
3982 Type: ofp.OfpGroupType_OFPGT_ALL,
3983 GroupId: groupID,
3984 }
3985 groupEntry := ofp.OfpGroupEntry{
3986 Desc: &groupDesc,
3987 }
3988 for i := 0; i < len(outPorts); i++ {
3989 var acts []*ofp.OfpAction
3990 acts = append(acts, flows.Output(outPorts[i]))
3991 bucket := ofp.OfpBucket{
3992 Actions: acts,
3993 }
3994 groupDesc.Buckets = append(groupDesc.Buckets, &bucket)
3995 }
3996 return &groupEntry
3997}
3998
3999// L2oamCreateSchedulerQueues creates scheduler queues
4000func (f *OpenOltFlowMgr) L2oamCreateSchedulerQueues(ctx context.Context, sq schedQueue) error {
4001 if !autoAddFlow {
4002 return nil
4003 }
4004
4005 var meterConfig *ofp.OfpMeterConfig
4006 if sq.flowMetadata != nil {
4007 for _, meter := range sq.flowMetadata.Meters {
4008 if sq.meterID == meter.MeterId {
4009 meterConfig = meter
4010 logger.Debugw(ctx, "found-meter-config-from-flowmetadata",
4011 log.Fields{"meterConfig": meterConfig,
4012 "device-id": f.deviceHandler.device.Id})
4013 break
4014 }
4015 }
4016 } else {
4017 logger.Errorw(ctx, "flow-metadata-not-present-in-flow", log.Fields{"device-id": f.deviceHandler.device.Id})
4018 }
4019 if meterConfig == nil {
4020 return olterrors.NewErrNotFound("meterbands", log.Fields{
4021 "reason": "Could-not-get-meterbands-from-flowMetadata",
4022 "flow-metadata": sq.flowMetadata,
4023 "meter-id": sq.meterID,
4024 "device-id": f.deviceHandler.device.Id}, nil)
4025 } else if len(meterConfig.Bands) < MaxMeterBand {
4026 logger.Errorw(ctx, "invalid-number-of-bands-in-meter",
4027 log.Fields{"Bands": meterConfig.Bands,
4028 "meter-id": sq.meterID,
4029 "device-id": f.deviceHandler.device.Id})
4030 return olterrors.NewErrInvalidValue(log.Fields{
4031 "reason": "Invalid-number-of-bands-in-meter",
4032 "meterband-count": len(meterConfig.Bands),
4033 "metabands": meterConfig.Bands,
4034 "meter-id": sq.meterID,
4035 "device-id": f.deviceHandler.device.Id}, nil)
4036 }
4037 cir := meterConfig.Bands[0].Rate
4038 eir := meterConfig.Bands[1].Rate
4039 pir := cir + eir
4040 onuKey := f.deviceHandler.formOnuKey(sq.intfID, sq.onuID)
4041 onuDev, ok := f.deviceHandler.onus.Load(onuKey)
4042 if !ok {
4043 logger.Errorw(ctx, "not-found-onu-device",
4044 log.Fields{"intf-id": sq.intfID,
4045 "onu-id": sq.onuID})
4046 } else {
4047 onuDeviceID := (onuDev.(*OnuDevice)).deviceID
4048 if _, exists := mapCreateScheduler[onuDeviceID]; exists {
4049 logger.Debugw(ctx, "scheduler-exists",
4050 log.Fields{"onu-device-id": onuDeviceID})
4051 return nil
4052 }
4053 mapCreateScheduler[onuDeviceID] = true
4054 l2oamCmd := &L2oamCmd{
4055 Type: "add-flow",
4056 Cir: []byte{0x00, 0x00, 0x00, 0x00},
4057 Pir: []byte{0x00, 0x00, 0x00, 0x00},
4058 OnuDeviceID: onuDeviceID,
4059 }
4060 binary.BigEndian.PutUint32(l2oamCmd.Cir[0:4], cir)
4061 binary.BigEndian.PutUint32(l2oamCmd.Pir[0:4], pir)
4062 L2oamAddFlow(ctx, f.deviceHandler, l2oamCmd)
4063
4064 f.deviceHandler.L2oamAddFlowAndMount(ctx, onuDeviceID, []byte{0x0f, 0xfb}, []byte{0x00, 0x64})
4065 }
4066
4067 return nil
4068}
4069
4070// l2oamAddEapolToDevice adds EAPOL flow to device
4071/*
4072func (f *OpenOltFlowMgr) l2oamAddEapolToDevice(ctx context.Context, intfID uint32, onuID uint32) error {
4073 if !autoAddFlow {
4074 return nil
4075 }
4076
4077 onuKey := f.deviceHandler.formOnuKey(intfID, onuID)
4078 onuDev, ok := f.deviceHandler.onus.Load(onuKey)
4079 if !ok {
4080 logger.Errorw(ctx, "not-found-onu-device",
4081 log.Fields{"intf-id": intfID,
4082 "onu-id": onuID})
4083 } else {
4084 f.deviceHandler.L2oamAddFlowAndMount(ctx, (onuDev.(*OnuDevice)).deviceID, []byte{0x0f, 0xfb}, []byte{0x00, 0x64})
4085 }
4086
4087 return nil
4088}
4089*/
4090
4091func (f *OpenOltFlowMgr) l2oamAddFlowDevice(ctx context.Context, intfID uint32, onuID uint32, ovid uint32, ivid uint32) {
4092 logger.Debugw(ctx, "add-flow-to-device",
4093 log.Fields{"onu-id": onuID, "o-vid": ovid, "i-vid": ivid})
4094
4095 if !autoAddFlow {
4096 return
4097 }
4098
4099 onuKey := f.deviceHandler.formOnuKey(intfID, onuID)
4100 onuDev, ok := f.deviceHandler.onus.Load(onuKey)
4101 if !ok {
4102 logger.Errorw(ctx, "not-found-onu-device",
4103 log.Fields{"intf-id": intfID,
4104 "onu-id": onuID})
4105 } else {
4106 onuDeviceID := (onuDev.(*OnuDevice)).deviceID
4107 if _, exists := mapAddFlow[onuDeviceID]; exists {
4108 logger.Debugw(ctx, "flow is added already",
4109 log.Fields{"onu-device-id": onuDeviceID})
4110 return
4111 }
4112 mapAddFlow[onuDeviceID] = true
4113 ovidBytes := []byte{0x00, 0x00}
4114 binary.BigEndian.PutUint16(ovidBytes[:], uint16(ovid))
4115 ividBytes := []byte{0x00, 0x00}
4116 binary.BigEndian.PutUint16(ividBytes[:], uint16(ivid))
4117 f.deviceHandler.L2oamAddFlowAndMount(ctx, onuDeviceID, ovidBytes, ividBytes)
4118 }
4119}