blob: 6b295588655c4d17730484d7b3bbe368cd43520c [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//Package core provides the utility for olt devices, flows and statistics
18package core
19
20import (
21 "context"
22 "encoding/binary"
23 "encoding/hex"
24 "fmt"
25 "io"
26 "net"
27 "strconv"
28 "strings"
29 "sync"
30 "time"
31
32 "github.com/cenkalti/backoff/v3"
33 "github.com/gogo/protobuf/proto"
34 "github.com/golang/protobuf/ptypes"
35 grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
36 grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
37 "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
38 "github.com/opencord/voltha-lib-go/v3/pkg/flows"
39 "github.com/opencord/voltha-lib-go/v3/pkg/log"
40 "github.com/opencord/voltha-lib-go/v3/pkg/pmmetrics"
41 "github.com/opencord/voltha-openolt-adapter/internal/pkg/core/l2oam"
42 "github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
43 rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
44 "github.com/opencord/voltha-openolt-adapter/pkg/mocks"
45 "github.com/opencord/voltha-protos/v3/go/common"
46 ic "github.com/opencord/voltha-protos/v3/go/inter_container"
47 of "github.com/opencord/voltha-protos/v3/go/openflow_13"
48 oop "github.com/opencord/voltha-protos/v3/go/openolt"
49 "github.com/opencord/voltha-protos/v3/go/voltha"
50 "github.com/opentracing/opentracing-go"
51 "google.golang.org/grpc"
52 "google.golang.org/grpc/codes"
53 "google.golang.org/grpc/status"
54)
55
56// Constants for number of retries and for timeout
57const (
58 MaxRetry = 10
59 MaxTimeOutInMs = 500
60 InvalidPort = 0xffffffff
61)
62
63// pendingFlowRemoveDataKey is key to pendingFlowRemoveDataPerSubscriber map
64type pendingFlowRemoveDataKey struct {
65 intfID uint32
66 onuID uint32
67 uniID uint32
68}
69
70// pendingFlowRemoveData is value stored in pendingFlowRemoveDataPerSubscriber map
71// This holds the number of pending flow removes and also a signal channel to
72// to indicate the receiver when all flow removes are handled
73type pendingFlowRemoveData struct {
74 pendingFlowRemoveCount uint32
75 allFlowsRemoved chan struct{}
76}
77
78//DeviceHandler will interact with the OLT device.
79type DeviceHandler struct {
80 device *voltha.Device
81 coreProxy adapterif.CoreProxy
82 AdapterProxy adapterif.AdapterProxy
83 EventProxy adapterif.EventProxy
84 openOLT *OpenOLT
85 exitChannel chan int
86 lockDevice sync.RWMutex
87 Client oop.OpenoltClient
88 transitionMap *TransitionMap
89 clientCon *grpc.ClientConn
90 flowMgr *OpenOltFlowMgr
91 eventMgr *OpenOltEventMgr
92 resourceMgr *rsrcMgr.OpenOltResourceMgr
93
94 discOnus sync.Map
95 onus sync.Map
96 portStats *OpenOltStatisticsMgr
97 metrics *pmmetrics.PmMetrics
98 stopCollector chan bool
99 stopHeartbeatCheck chan bool
100 activePorts sync.Map
101 stopIndications chan bool
102 isReadIndicationRoutineActive bool
103
104 pendingFlowRemoveDataPerSubscriber map[pendingFlowRemoveDataKey]pendingFlowRemoveData
105}
106
107//OnuDevice represents ONU related info
108type OnuDevice struct {
109 deviceID string
110 deviceType string
111 serialNumber string
112 onuID uint32
113 intfID uint32
114 proxyDeviceID string
115 losRaised bool
116 rdiRaised bool
117}
118
119var pmNames = []string{
120 "rx_bytes",
121 "rx_packets",
122 "rx_mcast_packets",
123 "rx_bcast_packets",
124 "tx_bytes",
125 "tx_packets",
126 "tx_mcast_packets",
127 "tx_bcast_packets",
128}
129
130// set true if you want to disable BBSim function
131var disableBBSim bool = true
132
133//NewOnuDevice creates a new Onu Device
134func NewOnuDevice(devID, deviceTp, serialNum string, onuID, intfID uint32, proxyDevID string, losRaised bool) *OnuDevice {
135 var device OnuDevice
136 device.deviceID = devID
137 device.deviceType = deviceTp
138 device.serialNumber = serialNum
139 device.onuID = onuID
140 device.intfID = intfID
141 device.proxyDeviceID = proxyDevID
142 device.losRaised = losRaised
143 return &device
144}
145
146//NewDeviceHandler creates a new device handler
147func NewDeviceHandler(cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
148 var dh DeviceHandler
149 dh.coreProxy = cp
150 dh.AdapterProxy = ap
151 dh.EventProxy = ep
152 cloned := (proto.Clone(device)).(*voltha.Device)
153 dh.device = cloned
154 dh.openOLT = adapter
155 dh.exitChannel = make(chan int, 1)
156 dh.lockDevice = sync.RWMutex{}
157 dh.stopCollector = make(chan bool, 2)
158 dh.stopHeartbeatCheck = make(chan bool, 2)
159 dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
160 dh.activePorts = sync.Map{}
161 dh.stopIndications = make(chan bool, 1)
162 dh.pendingFlowRemoveDataPerSubscriber = make(map[pendingFlowRemoveDataKey]pendingFlowRemoveData)
163
164 if dh.openOLT.config != nil {
165 NewL2oamHandle(context.Background(), dh.openOLT.config.InterfaceName, dh.openOLT.config.SrcMac)
166 }
167
168 return &dh
169}
170
171// start save the device to the data model
172func (dh *DeviceHandler) start(ctx context.Context) {
173 dh.lockDevice.Lock()
174 defer dh.lockDevice.Unlock()
175 logger.Debugw(ctx, "starting-device-agent", log.Fields{"device": dh.device})
176 logger.Debug(ctx, "device-agent-started")
177}
178
179// stop stops the device dh. Not much to do for now
180func (dh *DeviceHandler) stop(ctx context.Context) {
181 dh.lockDevice.Lock()
182 defer dh.lockDevice.Unlock()
183 logger.Debug(ctx, "stopping-device-agent")
184 dh.exitChannel <- 1
185 logger.Debug(ctx, "device-agent-stopped")
186}
187
188func macifyIP(ip net.IP) string {
189 if len(ip) > 0 {
190 oct1 := strconv.FormatInt(int64(ip[12]), 16)
191 oct2 := strconv.FormatInt(int64(ip[13]), 16)
192 oct3 := strconv.FormatInt(int64(ip[14]), 16)
193 oct4 := strconv.FormatInt(int64(ip[15]), 16)
194 return fmt.Sprintf("00:00:%02v:%02v:%02v:%02v", oct1, oct2, oct3, oct4)
195 }
196 return ""
197}
198
199func generateMacFromHost(ctx context.Context, host string) (string, error) {
200 var genmac string
201 var addr net.IP
202 var ips []string
203 var err error
204
205 logger.Debugw(ctx, "generating-mac-from-host", log.Fields{"host": host})
206
207 if addr = net.ParseIP(host); addr == nil {
208 logger.Debugw(ctx, "looking-up-hostname", log.Fields{"host": host})
209
210 if ips, err = net.LookupHost(host); err == nil {
211 logger.Debugw(ctx, "dns-result-ips", log.Fields{"ips": ips})
212 if addr = net.ParseIP(ips[0]); addr == nil {
213 return "", olterrors.NewErrInvalidValue(log.Fields{"ip": ips[0]}, nil)
214 }
215 genmac = macifyIP(addr)
216 logger.Debugw(ctx, "using-ip-as-mac",
217 log.Fields{"host": ips[0],
218 "mac": genmac})
219 return genmac, nil
220 }
221 return "", olterrors.NewErrAdapter("cannot-resolve-hostname-to-ip", log.Fields{"host": host}, err)
222 }
223
224 genmac = macifyIP(addr)
225 logger.Debugw(ctx, "using-ip-as-mac",
226 log.Fields{"host": host,
227 "mac": genmac})
228 return genmac, nil
229}
230
231func macAddressToUint32Array(mac string) []uint32 {
232 slist := strings.Split(mac, ":")
233 result := make([]uint32, len(slist))
234 var err error
235 var tmp int64
236 for index, val := range slist {
237 if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
238 return []uint32{1, 2, 3, 4, 5, 6}
239 }
240 result[index] = uint32(tmp)
241 }
242 return result
243}
244
245//GetportLabel returns the label for the NNI and the PON port based on port number and port type
246func GetportLabel(portNum uint32, portType voltha.Port_PortType) (string, error) {
247
248 switch portType {
249 case voltha.Port_ETHERNET_NNI:
250 return fmt.Sprintf("nni-%d", portNum), nil
251 case voltha.Port_PON_OLT:
252 return fmt.Sprintf("pon-%d", portNum), nil
253 }
254
255 return "", olterrors.NewErrInvalidValue(log.Fields{"port-type": portType}, nil)
256}
257
258func (dh *DeviceHandler) addPort(ctx context.Context, intfID uint32, portType voltha.Port_PortType, state string) error {
259 var operStatus common.OperStatus_Types
260 if state == "up" {
261 operStatus = voltha.OperStatus_ACTIVE
262 //populating the intfStatus map
263 dh.activePorts.Store(intfID, true)
264 } else {
265 operStatus = voltha.OperStatus_DISCOVERED
266 dh.activePorts.Store(intfID, false)
267 }
268 portNum := IntfIDToPortNo(intfID, portType)
269 label, err := GetportLabel(portNum, portType)
270 if err != nil {
271 return olterrors.NewErrNotFound("port-label", log.Fields{"port-number": portNum, "port-type": portType}, err)
272 }
273
274 if port, err := dh.coreProxy.GetDevicePort(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, portNum); err == nil && port.Type == portType {
275 logger.Debug(ctx, "port-already-exists-updating-oper-status-of-port")
276 if err := dh.coreProxy.PortStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, portType, portNum, operStatus); err != nil {
277 return olterrors.NewErrAdapter("failed-to-update-port-state", log.Fields{
278 "device-id": dh.device.Id,
279 "port-type": portType,
280 "port-number": portNum,
281 "oper-status": operStatus}, err).Log()
282 }
283 return nil
284 }
285 capacity := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
286 port := &voltha.Port{
287 PortNo: portNum,
288 Label: label,
289 Type: portType,
290 OperStatus: operStatus,
291 OfpPort: &of.OfpPort{
292 HwAddr: macAddressToUint32Array(dh.device.MacAddress),
293 Config: 0,
294 State: uint32(of.OfpPortState_OFPPS_LIVE),
295 Curr: capacity,
296 Advertised: capacity,
297 Peer: capacity,
298 CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
299 MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
300 },
301 }
302 logger.Debugw(ctx, "sending-port-update-to-core", log.Fields{"port": port})
303 if err := dh.coreProxy.PortCreated(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, port); err != nil {
304 return olterrors.NewErrAdapter("error-creating-port", log.Fields{
305 "device-id": dh.device.Id,
306 "port-type": portType}, err)
307 }
308 go dh.updateLocalDevice(ctx)
309 return nil
310}
311
312func (dh *DeviceHandler) updateLocalDevice(ctx context.Context) {
313 dh.lockDevice.Lock()
314 defer dh.lockDevice.Unlock()
315 device, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, dh.device.Id)
316 if err != nil || device == nil {
317 logger.Errorf(ctx, "device-not-found", log.Fields{"device-id": dh.device.Id}, err)
318 return
319 }
320 dh.device = device
321}
322
323// nolint: gocyclo
324// readIndications to read the indications from the OLT device
325func (dh *DeviceHandler) readIndications(ctx context.Context) error {
326 defer logger.Debugw(ctx, "indications-ended", log.Fields{"device-id": dh.device.Id})
327 defer func() {
328 dh.lockDevice.Lock()
329 dh.isReadIndicationRoutineActive = false
330 dh.lockDevice.Unlock()
331 }()
332 indications, err := dh.startOpenOltIndicationStream(ctx)
333 if err != nil {
334 return err
335 }
336 if disableBBSim {
337 return nil
338 }
339 /* get device state */
340 device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
341 if err != nil || device == nil {
342 /*TODO: needs to handle error scenarios */
343 return olterrors.NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err)
344 }
345
346 indicationBackoff := backoff.NewExponentialBackOff()
347 indicationBackoff.MaxElapsedTime = 0
348 indicationBackoff.MaxInterval = 1 * time.Minute
349
350 dh.lockDevice.Lock()
351 dh.isReadIndicationRoutineActive = true
352 dh.lockDevice.Unlock()
353
354Loop:
355 for {
356 select {
357 case <-dh.stopIndications:
358 logger.Debugw(ctx, "stopping-collecting-indications-for-olt", log.Fields{"deviceID:": dh.device.Id})
359 break Loop
360 default:
361 indication, err := indications.Recv()
362 if err == io.EOF {
363 logger.Infow(ctx, "eof-for-indications",
364 log.Fields{"err": err,
365 "device-id": dh.device.Id})
366 // Use an exponential back off to prevent getting into a tight loop
367 duration := indicationBackoff.NextBackOff()
368 if duration == backoff.Stop {
369 // If we reach a maximum then warn and reset the backoff
370 // timer and keep attempting.
371 logger.Warnw(ctx, "maximum-indication-backoff-reached--resetting-backoff-timer",
372 log.Fields{"max-indication-backoff": indicationBackoff.MaxElapsedTime,
373 "device-id": dh.device.Id})
374 indicationBackoff.Reset()
375 }
376
377 // On failure process a backoff timer while watching for stopIndications
378 // events
379 backoff := time.NewTimer(indicationBackoff.NextBackOff())
380 select {
381 case <-dh.stopIndications:
382 logger.Debugw(ctx, "stopping-collecting-indications-for-olt", log.Fields{"deviceID:": dh.device.Id})
383 if !backoff.Stop() {
384 <-backoff.C
385 }
386 break Loop
387 case <-backoff.C:
388 // backoff expired continue
389 }
390 if indications, err = dh.startOpenOltIndicationStream(ctx); err != nil {
391 return err
392 }
393 continue
394 }
395 if err != nil {
396 logger.Errorw(ctx, "read-indication-error",
397 log.Fields{"err": err,
398 "device-id": dh.device.Id})
399 if device.AdminState == voltha.AdminState_DELETED {
400 logger.Debug(ctx, "device-deleted--stopping-the-read-indication-thread")
401 break Loop
402 }
403 // Close the stream, and re-initialize it
404 if err = indications.CloseSend(); err != nil {
405 // Ok to ignore here, because we landed here due to a problem on the stream
406 // In all probability, the closeSend call may fail
407 logger.Debugw(ctx, "error-closing-send stream--error-ignored",
408 log.Fields{"err": err,
409 "device-id": dh.device.Id})
410 }
411 if indications, err = dh.startOpenOltIndicationStream(ctx); err != nil {
412 return err
413 }
414 // once we re-initialized the indication stream, continue to read indications
415 continue
416 }
417 // Reset backoff if we have a successful receive
418 indicationBackoff.Reset()
419 // When OLT is admin down, ignore all indications.
420 if device.AdminState == voltha.AdminState_DISABLED && !isIndicationAllowedDuringOltAdminDown(indication) {
421 logger.Debugw(ctx, "olt-is-admin-down, ignore indication",
422 log.Fields{"indication": indication,
423 "device-id": dh.device.Id})
424 continue
425 }
426 dh.handleIndication(ctx, indication)
427 }
428 }
429 _ = indications.CloseSend() // Ok to ignore error, as we stopping the readIndication anyway
430
431 return nil
432}
433
434func (dh *DeviceHandler) startOpenOltIndicationStream(ctx context.Context) (oop.Openolt_EnableIndicationClient, error) {
435 L2oamEnableIndication(ctx, dh)
436
437 indications, err := dh.Client.EnableIndication(ctx, new(oop.Empty))
438 if err != nil {
439 return nil, olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
440 }
441 if indications == nil {
442 return nil, olterrors.NewErrInvalidValue(log.Fields{"indications": nil, "device-id": dh.device.Id}, nil).Log()
443 }
444
445 return indications, nil
446}
447
448// isIndicationAllowedDuringOltAdminDown returns true if the indication is allowed during OLT Admin down, else false
449func isIndicationAllowedDuringOltAdminDown(indication *oop.Indication) bool {
450 switch indication.Data.(type) {
451 case *oop.Indication_OltInd, *oop.Indication_IntfInd, *oop.Indication_IntfOperInd:
452 return true
453
454 default:
455 return false
456 }
457}
458
459func (dh *DeviceHandler) handleOltIndication(ctx context.Context, oltIndication *oop.OltIndication) error {
460 raisedTs := time.Now().UnixNano()
461 if oltIndication.OperState == "up" && dh.transitionMap.currentDeviceState != deviceStateUp {
462 dh.transitionMap.Handle(ctx, DeviceUpInd)
463 } else if oltIndication.OperState == "down" {
464 dh.transitionMap.Handle(ctx, DeviceDownInd)
465 }
466 if err := dh.eventMgr.oltUpDownIndication(ctx, oltIndication, dh.device.Id, raisedTs); err != nil {
467 return olterrors.NewErrAdapter("failed-indication", log.Fields{
468 "device_id": dh.device.Id,
469 "indication": oltIndication,
470 "timestamp": raisedTs}, err)
471 }
472 return nil
473}
474
475// nolint: gocyclo
476func (dh *DeviceHandler) handleIndication(ctx context.Context, indication *oop.Indication) {
477 raisedTs := time.Now().UnixNano()
478 switch indication.Data.(type) {
479 case *oop.Indication_OltInd:
480 span, ctx := log.CreateChildSpan(ctx, "olt-indication", log.Fields{"device-id": dh.device.Id})
481 defer span.Finish()
482
483 if err := dh.handleOltIndication(ctx, indication.GetOltInd()); err != nil {
484 _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "olt", "device-id": dh.device.Id}, err).Log()
485 }
486 case *oop.Indication_IntfInd:
487 span, ctx := log.CreateChildSpan(ctx, "interface-indication", log.Fields{"device-id": dh.device.Id})
488 defer span.Finish()
489
490 intfInd := indication.GetIntfInd()
491 go func() {
492 if err := dh.addPort(ctx, intfInd.GetIntfId(), voltha.Port_PON_OLT, intfInd.GetOperState()); err != nil {
493 _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "interface", "device-id": dh.device.Id}, err).Log()
494 }
495 }()
496 logger.Infow(ctx, "received-interface-indication", log.Fields{"InterfaceInd": intfInd, "device-id": dh.device.Id})
497 case *oop.Indication_IntfOperInd:
498 span, ctx := log.CreateChildSpan(ctx, "interface-oper-indication", log.Fields{"device-id": dh.device.Id})
499 defer span.Finish()
500
501 intfOperInd := indication.GetIntfOperInd()
502 if intfOperInd.GetType() == "nni" {
503 go func() {
504 if err := dh.addPort(ctx, intfOperInd.GetIntfId(), voltha.Port_ETHERNET_NNI, intfOperInd.GetOperState()); err != nil {
505 _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "interface-oper-nni", "device-id": dh.device.Id}, err).Log()
506 }
507 }()
508 if err := dh.resourceMgr.AddNNIToKVStore(ctx, intfOperInd.GetIntfId()); err != nil {
509 logger.Warn(ctx, err)
510 }
511 } else if intfOperInd.GetType() == "pon" {
512 // TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
513 // Handle pon port update
514 go func() {
515 if err := dh.addPort(ctx, intfOperInd.GetIntfId(), voltha.Port_PON_OLT, intfOperInd.GetOperState()); err != nil {
516 _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "interface-oper-pon", "device-id": dh.device.Id}, err).Log()
517 }
518 }()
519 go dh.eventMgr.oltIntfOperIndication(ctx, indication.GetIntfOperInd(), dh.device.Id, raisedTs)
520 }
521 logger.Infow(ctx, "received-interface-oper-indication",
522 log.Fields{"interfaceOperInd": intfOperInd,
523 "device-id": dh.device.Id})
524 case *oop.Indication_OnuDiscInd:
525 span, ctx := log.CreateChildSpan(ctx, "onu-discovery-indication", log.Fields{"device-id": dh.device.Id})
526 defer span.Finish()
527
528 onuDiscInd := indication.GetOnuDiscInd()
529 logger.Infow(ctx, "received-onu-discovery-indication", log.Fields{"OnuDiscInd": onuDiscInd, "device-id": dh.device.Id})
530 sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
531 go func() {
532 if err := dh.onuDiscIndication(ctx, onuDiscInd, sn); err != nil {
533 _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "onu-discovery", "device-id": dh.device.Id}, err).Log()
534 }
535 }()
536 case *oop.Indication_OnuInd:
537 span, ctx := log.CreateChildSpan(ctx, "onu-indication", log.Fields{"device-id": dh.device.Id})
538 defer span.Finish()
539
540 onuInd := indication.GetOnuInd()
541 logger.Infow(ctx, "received-onu-indication", log.Fields{"OnuInd": onuInd, "device-id": dh.device.Id})
542 go func() {
543 if err := dh.onuIndication(ctx, onuInd); err != nil {
544 _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "onu", "device-id": dh.device.Id}, err).Log()
545 }
546 }()
547 case *oop.Indication_OmciInd:
548 span, ctx := log.CreateChildSpan(ctx, "omci-indication", log.Fields{"device-id": dh.device.Id})
549 defer span.Finish()
550
551 omciInd := indication.GetOmciInd()
552 logger.Debugw(ctx, "received-omci-indication", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
553 go func() {
554 if err := dh.omciIndication(ctx, omciInd); err != nil {
555 _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "omci", "device-id": dh.device.Id}, err).Log()
556 }
557 }()
558 case *oop.Indication_PktInd:
559 span, ctx := log.CreateChildSpan(ctx, "packet-indication", log.Fields{"device-id": dh.device.Id})
560 defer span.Finish()
561
562 pktInd := indication.GetPktInd()
563 logger.Debugw(ctx, "received-packet-indication", log.Fields{
564 "intf-type": pktInd.IntfId,
565 "intf-id": pktInd.IntfId,
566 "gem-port-id": pktInd.GemportId,
567 "port-no": pktInd.PortNo,
568 "device-id": dh.device.Id,
569 })
570
571 if logger.V(log.DebugLevel) {
572 logger.Debugw(ctx, "received-packet-indication-packet", log.Fields{
573 "intf-type": pktInd.IntfId,
574 "intf-id": pktInd.IntfId,
575 "gem-port-id": pktInd.GemportId,
576 "port-no": pktInd.PortNo,
577 "packet": hex.EncodeToString(pktInd.Pkt),
578 "device-id": dh.device.Id,
579 })
580 }
581
582 go func() {
583 if err := dh.handlePacketIndication(ctx, pktInd); err != nil {
584 _ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "packet", "device-id": dh.device.Id}, err).Log()
585 }
586 }()
587 case *oop.Indication_PortStats:
588 span, ctx := log.CreateChildSpan(ctx, "port-statistics-indication", log.Fields{"device-id": dh.device.Id})
589 defer span.Finish()
590
591 portStats := indication.GetPortStats()
592 go dh.portStats.PortStatisticsIndication(ctx, portStats, dh.resourceMgr.DevInfo.GetPonPorts())
593 case *oop.Indication_FlowStats:
594 span, ctx := log.CreateChildSpan(ctx, "flow-stats-indication", log.Fields{"device-id": dh.device.Id})
595 defer span.Finish()
596
597 flowStats := indication.GetFlowStats()
598 logger.Infow(ctx, "received-flow-stats", log.Fields{"FlowStats": flowStats, "device-id": dh.device.Id})
599 case *oop.Indication_AlarmInd:
600 span, ctx := log.CreateChildSpan(ctx, "alarm-indication", log.Fields{"device-id": dh.device.Id})
601 defer span.Finish()
602
603 alarmInd := indication.GetAlarmInd()
604 logger.Infow(ctx, "received-alarm-indication", log.Fields{"AlarmInd": alarmInd, "device-id": dh.device.Id})
605 go dh.eventMgr.ProcessEvents(ctx, alarmInd, dh.device.Id, raisedTs)
606 }
607}
608
609// doStateUp handle the olt up indication and update to voltha core
610func (dh *DeviceHandler) doStateUp(ctx context.Context) error {
611 go startCollector(ctx, dh)
612
613 if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
614 voltha.OperStatus_ACTIVE); err != nil {
615 return olterrors.NewErrAdapter("device-update-failed", log.Fields{"device-id": dh.device.Id}, err)
616 }
617 return nil
618}
619
620// doStateDown handle the olt down indication
621func (dh *DeviceHandler) doStateDown(ctx context.Context) error {
622 dh.lockDevice.Lock()
623 defer dh.lockDevice.Unlock()
624 logger.Debugw(ctx, "do-state-down-start", log.Fields{"device-id": dh.device.Id})
625
626 device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
627 if err != nil || device == nil {
628 /*TODO: needs to handle error scenarios */
629 return olterrors.NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err)
630 }
631
632 cloned := proto.Clone(device).(*voltha.Device)
633
634 cloned.OperStatus = voltha.OperStatus_UNKNOWN
635 dh.device = cloned
636
637 if err = dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
638 return olterrors.NewErrAdapter("state-update-failed", log.Fields{"device-id": device.Id}, err)
639 }
640
641 onuDevices, err := dh.coreProxy.GetChildDevices(ctx, dh.device.Id)
642 if err != nil {
643 return olterrors.NewErrAdapter("child-device-fetch-failed", log.Fields{"device-id": dh.device.Id}, err)
644 }
645 for _, onuDevice := range onuDevices.Items {
646
647 // Update onu state as down in onu adapter
648 onuInd := oop.OnuIndication{}
649 onuInd.OperState = "down"
650 err := dh.AdapterProxy.SendInterAdapterMessage(ctx, &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
651 "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
652 if err != nil {
653 _ = olterrors.NewErrCommunication("inter-adapter-send-failed", log.Fields{
654 "source": "openolt",
655 "onu-indicator": onuInd,
656 "device-type": onuDevice.Type,
657 "device-id": onuDevice.Id}, err).LogAt(log.ErrorLevel)
658 //Do not return here and continue to process other ONUs
659 }
660 }
661 /* Discovered ONUs entries need to be cleared , since after OLT
662 is up, it starts sending discovery indications again*/
663 dh.discOnus = sync.Map{}
664 logger.Debugw(ctx, "do-state-down-end", log.Fields{"device-id": device.Id})
665 return nil
666}
667
668// doStateInit dial the grpc before going to init state
669func (dh *DeviceHandler) doStateInit(ctx context.Context) error {
670 if disableBBSim {
671 return nil
672 }
673 var err error
674 dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(),
675 grpc.WithInsecure(),
676 grpc.WithBlock(),
677 grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
678 grpc_opentracing.StreamClientInterceptor(grpc_opentracing.WithTracer(opentracing.GlobalTracer())),
679 )),
680 grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
681 grpc_opentracing.UnaryClientInterceptor(grpc_opentracing.WithTracer(opentracing.GlobalTracer())),
682 )))
683
684 if err != nil {
685 return olterrors.NewErrCommunication("dial-failure", log.Fields{
686 "device-id": dh.device.Id,
687 "host-and-port": dh.device.GetHostAndPort()}, err)
688 }
689 return nil
690}
691
692// postInit create olt client instance to invoke RPC on the olt device
693func (dh *DeviceHandler) postInit(ctx context.Context) error {
694 if disableBBSim {
695 dh.Client = &mocks.MockOpenoltClient{}
696 } else {
697 dh.Client = oop.NewOpenoltClient(dh.clientCon)
698 }
699 dh.transitionMap.Handle(ctx, GrpcConnected)
700 return nil
701}
702
703// doStateConnected get the device info and update to voltha core
704func (dh *DeviceHandler) doStateConnected(ctx context.Context) error {
705 var err error
706 logger.Debugw(ctx, "olt-device-connected", log.Fields{"device-id": dh.device.Id})
707
708 device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
709 if err != nil || device == nil {
710 /*TODO: needs to handle error scenarios */
711 return olterrors.NewErrAdapter("device-fetch-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
712 }
713 if device.AdminState == voltha.AdminState_DISABLED {
714 logger.Debugln(ctx, "do-state-connected--device-admin-state-down")
715
716 cloned := proto.Clone(device).(*voltha.Device)
717 cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
718 cloned.OperStatus = voltha.OperStatus_UNKNOWN
719 dh.device = cloned
720 if err = dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
721 return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
722 }
723
724 // Since the device was disabled before the OLT was rebooted, enforce the OLT to be Disabled after re-connection.
725 _, err = dh.Client.DisableOlt(ctx, new(oop.Empty))
726 if err != nil {
727 return olterrors.NewErrAdapter("olt-disable-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
728 }
729 // We should still go ahead an initialize various device handler modules so that when OLT is re-enabled, we have
730 // all the modules initialized and ready to handle incoming ONUs.
731
732 err = dh.initializeDeviceHandlerModules(ctx)
733 if err != nil {
734 return olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
735 }
736
737 // Start reading indications
738 go func() {
739 if err = dh.readIndications(ctx); err != nil {
740 _ = olterrors.NewErrAdapter("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
741 }
742 }()
743 return nil
744 }
745
746 ports, err := dh.coreProxy.ListDevicePorts(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id)
747 if err != nil {
748 /*TODO: needs to handle error scenarios */
749 return olterrors.NewErrAdapter("fetch-ports-failed", log.Fields{"device-id": dh.device.Id}, err)
750 }
751 dh.populateActivePorts(ctx, ports)
752 if err := dh.disableAdminDownPorts(ctx, ports); err != nil {
753 return olterrors.NewErrAdapter("port-status-update-failed", log.Fields{"ports": ports}, err)
754 }
755
756 if err := dh.initializeDeviceHandlerModules(ctx); err != nil {
757 return olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
758 }
759
760 go func() {
761 if err := dh.readIndications(ctx); err != nil {
762 _ = olterrors.NewErrAdapter("read-indications-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
763 }
764 }()
765 go dh.updateLocalDevice(ctx)
766
767 if device.PmConfigs != nil {
768 dh.UpdatePmConfig(ctx, device.PmConfigs)
769 }
770 return nil
771}
772
773func (dh *DeviceHandler) initializeDeviceHandlerModules(ctx context.Context) error {
774 deviceInfo, err := dh.populateDeviceInfo(ctx)
775
776 if err != nil {
777 return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
778 }
779 if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, deviceInfo); dh.resourceMgr == nil {
780 return olterrors.ErrResourceManagerInstantiating
781 }
782
783 if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
784 return olterrors.ErrResourceManagerInstantiating
785
786 }
787 /* TODO: Instantiate Alarm , stats , BW managers */
788 /* Instantiating Event Manager to handle Alarms and KPIs */
789 dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
790
791 dh.portStats = NewOpenOltStatsMgr(ctx, dh)
792
793 return nil
794
795}
796
797func (dh *DeviceHandler) populateDeviceInfo(ctx context.Context) (*oop.DeviceInfo, error) {
798 var err error
799 var deviceInfo *oop.DeviceInfo
800
801 if disableBBSim {
802 deviceInfo, err = L2oamGetDeviceInfo(log.WithSpanFromContext(context.Background(), ctx), dh)
803 } else {
804 deviceInfo, err = dh.Client.GetDeviceInfo(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty))
805 }
806
807 if err != nil {
808 return nil, olterrors.NewErrPersistence("get", "device", 0, nil, err)
809 }
810 if deviceInfo == nil {
811 return nil, olterrors.NewErrInvalidValue(log.Fields{"device": nil}, nil)
812 }
813
814 logger.Debugw(ctx, "fetched-device-info", log.Fields{"deviceInfo": deviceInfo, "device-id": dh.device.Id})
815 dh.device.Root = true
816 dh.device.Vendor = deviceInfo.Vendor
817 dh.device.Model = deviceInfo.Model
818 dh.device.SerialNumber = deviceInfo.DeviceSerialNumber
819 dh.device.HardwareVersion = deviceInfo.HardwareVersion
820 dh.device.FirmwareVersion = deviceInfo.FirmwareVersion
821
822 if deviceInfo.DeviceId == "" {
823 logger.Warnw(ctx, "no-device-id-provided-using-host", log.Fields{"hostport": dh.device.GetHostAndPort()})
824 host := strings.Split(dh.device.GetHostAndPort(), ":")[0]
825 genmac, err := generateMacFromHost(ctx, host)
826 if err != nil {
827 return nil, olterrors.NewErrAdapter("failed-to-generate-mac-host", log.Fields{"host": host}, err)
828 }
829 logger.Debugw(ctx, "using-host-for-mac-address", log.Fields{"host": host, "mac": genmac})
830 dh.device.MacAddress = genmac
831 } else {
832 dh.device.MacAddress = deviceInfo.DeviceId
833 }
834
835 if err := dh.coreProxy.DeviceUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.device); err != nil {
836 return nil, olterrors.NewErrAdapter("device-update-failed", log.Fields{"device-id": dh.device.Id}, err)
837 }
838
839 return deviceInfo, nil
840}
841
842func startCollector(ctx context.Context, dh *DeviceHandler) {
843 logger.Debugf(ctx, "starting-collector")
844 for {
845 select {
846 case <-dh.stopCollector:
847 logger.Debugw(ctx, "stopping-collector-for-olt", log.Fields{"deviceID:": dh.device.Id})
848 return
849 case <-time.After(time.Duration(dh.metrics.ToPmConfigs().DefaultFreq) * time.Second):
850
851 ports, err := dh.coreProxy.ListDevicePorts(log.WithSpanFromContext(context.Background(), ctx), dh.device.Id)
852 if err != nil {
853 logger.Warnw(ctx, "failed-to-list-ports", log.Fields{"device-id": dh.device.Id, "error": err})
854 continue
855 }
856 for _, port := range ports {
857 // NNI Stats
858 if port.Type == voltha.Port_ETHERNET_NNI {
859 intfID := PortNoToIntfID(port.PortNo, voltha.Port_ETHERNET_NNI)
860 cmnni := dh.portStats.collectNNIMetrics(intfID)
861 logger.Debugw(ctx, "collect-nni-metrics", log.Fields{"metrics": cmnni})
862 go dh.portStats.publishMetrics(ctx, cmnni, port, dh.device.Id, dh.device.Type)
863 logger.Debugw(ctx, "publish-nni-metrics", log.Fields{"nni-port": port.Label})
864 }
865 // PON Stats
866 if port.Type == voltha.Port_PON_OLT {
867 intfID := PortNoToIntfID(port.PortNo, voltha.Port_PON_OLT)
868 if val, ok := dh.activePorts.Load(intfID); ok && val == true {
869 cmpon := dh.portStats.collectPONMetrics(intfID)
870 logger.Debugw(ctx, "collect-pon-metrics", log.Fields{"metrics": cmpon})
871 go dh.portStats.publishMetrics(ctx, cmpon, port, dh.device.Id, dh.device.Type)
872 }
873 logger.Debugw(ctx, "publish-pon-metrics", log.Fields{"pon-port": port.Label})
874 }
875 }
876 }
877 }
878}
879
880//AdoptDevice adopts the OLT device
881func (dh *DeviceHandler) AdoptDevice(ctx context.Context, device *voltha.Device) {
882 dh.transitionMap = NewTransitionMap(dh)
883 logger.Infow(ctx, "adopt-device", log.Fields{"device-id": device.Id, "Address": device.GetHostAndPort()})
884 dh.transitionMap.Handle(ctx, DeviceInit)
885
886 if err := dh.coreProxy.DevicePMConfigUpdate(ctx, dh.metrics.ToPmConfigs()); err != nil {
887 _ = olterrors.NewErrAdapter("error-updating-performance-metrics", log.Fields{"device-id": device.Id}, err).LogAt(log.ErrorLevel)
888 }
889
890 go startHeartbeatCheck(ctx, dh)
891}
892
893//GetOfpDeviceInfo Gets the Ofp information of the given device
894func (dh *DeviceHandler) GetOfpDeviceInfo(device *voltha.Device) (*ic.SwitchCapability, error) {
895 return &ic.SwitchCapability{
896 Desc: &of.OfpDesc{
897 MfrDesc: "VOLTHA Project",
898 HwDesc: "open_pon",
899 SwDesc: "open_pon",
900 SerialNum: dh.device.SerialNumber,
901 },
902 SwitchFeatures: &of.OfpSwitchFeatures{
903 NBuffers: 256,
904 NTables: 2,
905 Capabilities: uint32(of.OfpCapabilities_OFPC_FLOW_STATS |
906 of.OfpCapabilities_OFPC_TABLE_STATS |
907 of.OfpCapabilities_OFPC_PORT_STATS |
908 of.OfpCapabilities_OFPC_GROUP_STATS),
909 },
910 }, nil
911}
912
913func (dh *DeviceHandler) omciIndication(ctx context.Context, omciInd *oop.OmciIndication) error {
914 logger.Debugw(ctx, "omci-indication", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
915 var deviceType string
916 var deviceID string
917 var proxyDeviceID string
918
919 transid := extractOmciTransactionID(omciInd.Pkt)
920 if logger.V(log.DebugLevel) {
921 logger.Debugw(ctx, "recv-omci-msg", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id,
922 "omci-transaction-id": transid, "omci-msg": hex.EncodeToString(omciInd.Pkt)})
923 }
924
925 onuKey := dh.formOnuKey(omciInd.IntfId, omciInd.OnuId)
926
927 if onuInCache, ok := dh.onus.Load(onuKey); !ok {
928
929 logger.Debugw(ctx, "omci-indication-for-a-device-not-in-cache.", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
930 ponPort := IntfIDToPortNo(omciInd.GetIntfId(), voltha.Port_PON_OLT)
931 kwargs := make(map[string]interface{})
932 kwargs["onu_id"] = omciInd.OnuId
933 kwargs["parent_port_no"] = ponPort
934
935 onuDevice, err := dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
936 if err != nil {
937 return olterrors.NewErrNotFound("onu", log.Fields{
938 "intf-id": omciInd.IntfId,
939 "onu-id": omciInd.OnuId}, err)
940 }
941 deviceType = onuDevice.Type
942 deviceID = onuDevice.Id
943 proxyDeviceID = onuDevice.ProxyAddress.DeviceId
944 //if not exist in cache, then add to cache.
945 dh.onus.Store(onuKey, NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID, false))
946 } else {
947 //found in cache
948 logger.Debugw(ctx, "omci-indication-for-a-device-in-cache.", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
949 deviceType = onuInCache.(*OnuDevice).deviceType
950 deviceID = onuInCache.(*OnuDevice).deviceID
951 proxyDeviceID = onuInCache.(*OnuDevice).proxyDeviceID
952 }
953
954 omciMsg := &ic.InterAdapterOmciMessage{Message: omciInd.Pkt}
955 if err := dh.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx), omciMsg,
956 ic.InterAdapterMessageType_OMCI_REQUEST, dh.device.Type, deviceType,
957 deviceID, proxyDeviceID, ""); err != nil {
958 return olterrors.NewErrCommunication("omci-request", log.Fields{
959 "source": dh.device.Type,
960 "destination": deviceType,
961 "onu-id": deviceID,
962 "proxy-device-id": proxyDeviceID}, err)
963 }
964 return nil
965}
966
967//ProcessInterAdapterMessage sends the proxied messages to the target device
968// If the proxy address is not found in the unmarshalled message, it first fetches the onu device for which the message
969// is meant, and then send the unmarshalled omci message to this onu
970func (dh *DeviceHandler) ProcessInterAdapterMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
971 logger.Debugw(ctx, "process-inter-adapter-message", log.Fields{"msgID": msg.Header.Id})
972 if msg.Header.Type == ic.InterAdapterMessageType_OMCI_REQUEST {
973 msgID := msg.Header.Id
974 fromTopic := msg.Header.FromTopic
975 toTopic := msg.Header.ToTopic
976 toDeviceID := msg.Header.ToDeviceId
977 proxyDeviceID := msg.Header.ProxyDeviceId
978
979 logger.Debugw(ctx, "omci-request-message-header", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
980
981 msgBody := msg.GetBody()
982
983 omciMsg := &ic.InterAdapterOmciMessage{}
984 if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
985 return olterrors.NewErrAdapter("cannot-unmarshal-omci-msg-body", log.Fields{"msgbody": msgBody}, err)
986 }
987
988 if omciMsg.GetProxyAddress() == nil {
989 onuDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, toDeviceID)
990 if err != nil {
991 return olterrors.NewErrNotFound("onu", log.Fields{
992 "device-id": dh.device.Id,
993 "onu-device-id": toDeviceID}, err)
994 }
995 logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
996 if err := dh.sendProxiedMessage(ctx, onuDevice, omciMsg); err != nil {
997 return olterrors.NewErrCommunication("send-failed", log.Fields{
998 "device-id": dh.device.Id,
999 "onu-device-id": toDeviceID}, err)
1000 }
1001 } else {
1002 logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
1003 if err := dh.sendProxiedMessage(ctx, nil, omciMsg); err != nil {
1004 return olterrors.NewErrCommunication("send-failed", log.Fields{
1005 "device-id": dh.device.Id,
1006 "onu-device-id": toDeviceID}, err)
1007 }
1008 }
1009 } else if msg.Header.Type == ic.InterAdapterMessageType_ONU_IND_REQUEST {
1010 logger.Debugw(ctx, "got-message-from-onu", log.Fields{"message": msg.Header.Id})
1011 if err := dh.receivedMsgFromOnu(ctx, msg); err != nil {
1012 return err
1013 }
1014 } else {
1015 return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
1016 }
1017 return nil
1018}
1019
1020func (dh *DeviceHandler) sendProxiedMessage(ctx context.Context, onuDevice *voltha.Device, omciMsg *ic.InterAdapterOmciMessage) error {
1021 var intfID uint32
1022 var onuID uint32
1023 var connectStatus common.ConnectStatus_Types
1024 if onuDevice != nil {
1025 intfID = onuDevice.ProxyAddress.GetChannelId()
1026 onuID = onuDevice.ProxyAddress.GetOnuId()
1027 connectStatus = onuDevice.ConnectStatus
1028 } else {
1029 intfID = omciMsg.GetProxyAddress().GetChannelId()
1030 onuID = omciMsg.GetProxyAddress().GetOnuId()
1031 connectStatus = omciMsg.GetConnectStatus()
1032 }
1033 if connectStatus != voltha.ConnectStatus_REACHABLE {
1034 logger.Debugw(ctx, "onu-not-reachable--cannot-send-omci", log.Fields{"intf-id": intfID, "onu-id": onuID})
1035
1036 return olterrors.NewErrCommunication("unreachable", log.Fields{
1037 "intf-id": intfID,
1038 "onu-id": onuID}, nil)
1039 }
1040
1041 var omciMessage *oop.OmciMsg
1042 hexPkt := make([]byte, hex.EncodedLen(len(omciMsg.Message)))
1043 hex.Encode(hexPkt, omciMsg.Message)
1044 omciMessage = &oop.OmciMsg{IntfId: intfID, OnuId: onuID, Pkt: hexPkt}
1045
1046 transid := extractOmciTransactionID(omciMsg.Message)
1047 logger.Debugw(ctx, "sent-omci-msg", log.Fields{"intf-id": intfID, "onu-id": onuID,
1048 "omciTransactionID": transid, "omciMsg": string(omciMessage.Pkt)})
1049
1050 _, err := dh.Client.OmciMsgOut(log.WithSpanFromContext(context.Background(), ctx), omciMessage)
1051 if err != nil {
1052 return olterrors.NewErrCommunication("omci-send-failed", log.Fields{
1053 "intf-id": intfID,
1054 "onu-id": onuID,
1055 "message": omciMessage}, err)
1056 }
1057
1058 if disableBBSim {
1059 if err = dh.fakeOmciIndication(ctx, intfID, onuID, omciMsg.Message); err != nil {
1060 return nil
1061 }
1062 }
1063 return nil
1064}
1065
1066func (dh *DeviceHandler) activateONU(ctx context.Context, intfID uint32, onuID int64, serialNum *oop.SerialNumber, serialNumber string) error {
1067 logger.Debugw(ctx, "activate-onu", log.Fields{"intf-id": intfID, "onu-id": onuID, "serialNum": serialNum, "serialNumber": serialNumber, "device-id": dh.device.Id})
1068 if err := dh.flowMgr.UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
1069 return olterrors.NewErrAdapter("onu-activate-failed", log.Fields{"onu": onuID, "intf-id": intfID}, err)
1070 }
1071 var pir uint32 = 1000000
1072 Onu := oop.Onu{IntfId: intfID, OnuId: uint32(onuID), SerialNumber: serialNum, Pir: pir}
1073 if _, err := dh.Client.ActivateOnu(ctx, &Onu); err != nil {
1074 st, _ := status.FromError(err)
1075 if st.Code() == codes.AlreadyExists {
1076 logger.Debugw(ctx, "onu-activation-in-progress", log.Fields{"SerialNumber": serialNumber, "onu-id": onuID, "device-id": dh.device.Id})
1077
1078 } else {
1079 return olterrors.NewErrAdapter("onu-activate-failed", log.Fields{"onu": Onu, "device-id": dh.device.Id}, err)
1080 }
1081 } else {
1082 logger.Infow(ctx, "activated-onu", log.Fields{"SerialNumber": serialNumber, "device-id": dh.device.Id})
1083 }
1084 return nil
1085}
1086
1087func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string) error {
1088 channelID := onuDiscInd.GetIntfId()
1089 parentPortNo := IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
1090
1091 logger.Infow(ctx, "new-discovery-indication", log.Fields{"sn": sn})
1092
1093 kwargs := make(map[string]interface{})
1094 if sn != "" {
1095 kwargs["serial_number"] = sn
1096 } else {
1097 return olterrors.NewErrInvalidValue(log.Fields{"serial-number": sn}, nil)
1098 }
1099
1100 var alarmInd oop.OnuAlarmIndication
1101 raisedTs := time.Now().UnixNano()
1102 if _, loaded := dh.discOnus.LoadOrStore(sn, true); loaded {
1103
1104 /* When PON cable disconnected and connected back from OLT, it was expected OnuAlarmIndication
1105 with "los_status: off" should be raised but BAL does not raise this Alarm hence manually sending
1106 OnuLosClear event on receiving OnuDiscoveryIndication for the Onu after checking whether
1107 OnuLosRaise event sent for it */
1108 dh.onus.Range(func(Onukey interface{}, onuInCache interface{}) bool {
1109 if onuInCache.(*OnuDevice).serialNumber == sn && onuInCache.(*OnuDevice).losRaised {
1110 if onuDiscInd.GetIntfId() != onuInCache.(*OnuDevice).intfID {
1111 logger.Warnw(ctx, "onu-is-on-a-different-intf-id-now", log.Fields{
1112 "previousIntfId": onuInCache.(*OnuDevice).intfID,
1113 "currentIntfId": onuDiscInd.GetIntfId()})
1114 // TODO:: Should we need to ignore raising OnuLosClear event
1115 // when onu connected to different PON?
1116 }
1117 alarmInd.IntfId = onuInCache.(*OnuDevice).intfID
1118 alarmInd.OnuId = onuInCache.(*OnuDevice).onuID
1119 alarmInd.LosStatus = statusCheckOff
1120 go func() {
1121 if err := dh.eventMgr.onuAlarmIndication(ctx, &alarmInd, onuInCache.(*OnuDevice).deviceID, raisedTs); err != nil {
1122 logger.Debugw(ctx, "indication-failed", log.Fields{"error": err})
1123 }
1124 }()
1125 }
1126 return true
1127 })
1128
1129 logger.Warnw(ctx, "onu-sn-is-already-being-processed", log.Fields{"sn": sn})
1130 return nil
1131 }
1132
1133 var onuID uint32
1134
1135 onuDevice, err := dh.coreProxy.GetChildDevice(ctx, dh.device.Id, kwargs)
1136
1137 if err != nil {
1138 logger.Debugw(ctx, "core-proxy-get-child-device-failed", log.Fields{"parentDevice": dh.device.Id, "err": err, "sn": sn})
1139 if e, ok := status.FromError(err); ok {
1140 logger.Debugw(ctx, "core-proxy-get-child-device-failed-with-code", log.Fields{"errCode": e.Code(), "sn": sn})
1141 switch e.Code() {
1142 case codes.Internal:
1143 // this probably means NOT FOUND, so just create a new device
1144 onuDevice = nil
1145 case codes.DeadlineExceeded:
1146 // if the call times out, cleanup and exit
1147 dh.discOnus.Delete(sn)
1148 return olterrors.NewErrTimeout("get-child-device", log.Fields{"device-id": dh.device.Id}, err)
1149 }
1150 }
1151 }
1152
1153 if onuDevice == nil {
1154 // NOTE this should happen a single time, and only if GetChildDevice returns NotFound
1155 logger.Debugw(ctx, "creating-new-onu", log.Fields{"sn": sn})
1156 // we need to create a new ChildDevice
1157 ponintfid := onuDiscInd.GetIntfId()
1158 dh.lockDevice.Lock()
1159 onuID, err = dh.resourceMgr.GetONUID(ctx, ponintfid)
1160 dh.lockDevice.Unlock()
1161
1162 logger.Infow(ctx, "creating-new-onu-got-onu-id", log.Fields{"sn": sn, "onuId": onuID})
1163
1164 if err != nil {
1165 // if we can't create an ID in resource manager,
1166 // cleanup and exit
1167 dh.discOnus.Delete(sn)
1168 return olterrors.NewErrAdapter("resource-manager-get-onu-id-failed", log.Fields{
1169 "pon-intf-id": ponintfid,
1170 "serial-number": sn}, err)
1171 }
1172
1173 if onuDevice, err = dh.coreProxy.ChildDeviceDetected(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, int(parentPortNo),
1174 "", int(channelID), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuID)); err != nil {
1175 dh.discOnus.Delete(sn)
1176 dh.resourceMgr.FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
1177 return olterrors.NewErrAdapter("core-proxy-child-device-detected-failed", log.Fields{
1178 "pon-intf-id": ponintfid,
1179 "serial-number": sn}, err)
1180 }
1181 if err := dh.eventMgr.OnuDiscoveryIndication(ctx, onuDiscInd, dh.device.Id, onuDevice.Id, onuID, sn, time.Now().UnixNano()); err != nil {
1182 logger.Warnw(ctx, "discovery-indication-failed", log.Fields{"error": err})
1183 }
1184 logger.Infow(ctx, "onu-child-device-added",
1185 log.Fields{"onuDevice": onuDevice,
1186 "sn": sn,
1187 "onu-id": onuID,
1188 "device-id": dh.device.Id})
1189 }
1190
1191 onuID = onuDevice.ProxyAddress.OnuId
1192 logger.Debugw(ctx, "onu-discovery-indication-key-create",
1193 log.Fields{"onu-id": onuID,
1194 "intfId": onuDiscInd.GetIntfId(),
1195 "sn": sn})
1196 onuKey := dh.formOnuKey(onuDiscInd.GetIntfId(), onuID)
1197
1198 onuDev := NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuID, onuDiscInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false)
1199 dh.onus.Store(onuKey, onuDev)
1200 logger.Debugw(ctx, "new-onu-device-discovered",
1201 log.Fields{"onu": onuDev,
1202 "sn": sn})
1203
1204 if err := dh.coreProxy.DeviceStateUpdate(ctx, onuDevice.Id, common.ConnectStatus_REACHABLE, common.OperStatus_DISCOVERED); err != nil {
1205 return olterrors.NewErrAdapter("failed-to-update-device-state", log.Fields{
1206 "device-id": onuDevice.Id,
1207 "serial-number": sn}, err)
1208 }
1209 logger.Infow(ctx, "onu-discovered-reachable", log.Fields{"device-id": onuDevice.Id, "sn": sn})
1210 if err := dh.activateONU(ctx, onuDiscInd.IntfId, int64(onuID), onuDiscInd.SerialNumber, sn); err != nil {
1211 return olterrors.NewErrAdapter("onu-activation-failed", log.Fields{
1212 "device-id": onuDevice.Id,
1213 "serial-number": sn}, err)
1214 }
1215 return nil
1216}
1217func (dh *DeviceHandler) onuDiscIndication2(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string, macAddress string) error {
1218 channelID := onuDiscInd.GetIntfId()
1219 parentPortNo := IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
1220
1221 logger.Infow(ctx, "new-discovery-indication", log.Fields{"sn": sn})
1222
1223 kwargs := make(map[string]interface{})
1224 if sn != "" {
1225 kwargs["serial_number"] = sn
1226 } else {
1227 return olterrors.NewErrInvalidValue(log.Fields{"serial-number": sn}, nil)
1228 }
1229
1230 var alarmInd oop.OnuAlarmIndication
1231 raisedTs := time.Now().UnixNano()
1232 if _, loaded := dh.discOnus.LoadOrStore(sn, true); loaded {
1233
1234 /* When PON cable disconnected and connected back from OLT, it was expected OnuAlarmIndication
1235 with "los_status: off" should be raised but BAL does not raise this Alarm hence manually sending
1236 OnuLosClear event on receiving OnuDiscoveryIndication for the Onu after checking whether
1237 OnuLosRaise event sent for it */
1238 dh.onus.Range(func(Onukey interface{}, onuInCache interface{}) bool {
1239 if onuInCache.(*OnuDevice).serialNumber == sn && onuInCache.(*OnuDevice).losRaised {
1240 if onuDiscInd.GetIntfId() != onuInCache.(*OnuDevice).intfID {
1241 logger.Warnw(ctx, "onu-is-on-a-different-intf-id-now", log.Fields{
1242 "previousIntfId": onuInCache.(*OnuDevice).intfID,
1243 "currentIntfId": onuDiscInd.GetIntfId()})
1244 // TODO:: Should we need to ignore raising OnuLosClear event
1245 // when onu connected to different PON?
1246 }
1247 alarmInd.IntfId = onuInCache.(*OnuDevice).intfID
1248 alarmInd.OnuId = onuInCache.(*OnuDevice).onuID
1249 alarmInd.LosStatus = statusCheckOff
1250 go func() {
1251 if err := dh.eventMgr.onuAlarmIndication(ctx, &alarmInd, onuInCache.(*OnuDevice).deviceID, raisedTs); err != nil {
1252 logger.Debugw(ctx, "indication-failed", log.Fields{"error": err})
1253 }
1254 }()
1255 }
1256 return true
1257 })
1258
1259 logger.Warnw(ctx, "onu-sn-is-already-being-processed", log.Fields{"sn": sn})
1260 return nil
1261 }
1262
1263 var onuID uint32
1264
1265 onuDevice, err := dh.coreProxy.GetChildDevice(ctx, dh.device.Id, kwargs)
1266
1267 if err != nil {
1268 logger.Debugw(ctx, "core-proxy-get-child-device-failed", log.Fields{"parentDevice": dh.device.Id, "err": err, "sn": sn})
1269 if e, ok := status.FromError(err); ok {
1270 logger.Debugw(ctx, "core-proxy-get-child-device-failed-with-code", log.Fields{"errCode": e.Code(), "sn": sn})
1271 switch e.Code() {
1272 case codes.Internal:
1273 // this probably means NOT FOUND, so just create a new device
1274 onuDevice = nil
1275 case codes.DeadlineExceeded:
1276 // if the call times out, cleanup and exit
1277 dh.discOnus.Delete(sn)
1278 return olterrors.NewErrTimeout("get-child-device", log.Fields{"device-id": dh.device.Id}, err)
1279 }
1280 }
1281 }
1282
1283 if onuDevice == nil {
1284 // NOTE this should happen a single time, and only if GetChildDevice returns NotFound
1285 logger.Debugw(ctx, "creating-new-onu", log.Fields{"sn": sn})
1286 // we need to create a new ChildDevice
1287 ponintfid := onuDiscInd.GetIntfId()
1288 dh.lockDevice.Lock()
1289 onuID, err = dh.resourceMgr.GetONUID(ctx, ponintfid)
1290 dh.lockDevice.Unlock()
1291
1292 logger.Infow(ctx, "creating-new-onu-got-onu-id", log.Fields{"sn": sn, "onuId": onuID})
1293
1294 if err != nil {
1295 // if we can't create an ID in resource manager,
1296 // cleanup and exit
1297 dh.discOnus.Delete(sn)
1298 return olterrors.NewErrAdapter("resource-manager-get-onu-id-failed", log.Fields{
1299 "pon-intf-id": ponintfid,
1300 "serial-number": sn}, err)
1301 }
1302
1303 if onuDevice, err = dh.coreProxy.ChildDeviceDetected(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, int(parentPortNo),
1304 "", int(channelID), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuID)); err != nil {
1305 dh.discOnus.Delete(sn)
1306 dh.resourceMgr.FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
1307 return olterrors.NewErrAdapter("core-proxy-child-device-detected-failed", log.Fields{
1308 "pon-intf-id": ponintfid,
1309 "serial-number": sn}, err)
1310 }
1311 if err := dh.eventMgr.OnuDiscoveryIndication(ctx, onuDiscInd, dh.device.Id, onuDevice.Id, onuID, sn, time.Now().UnixNano()); err != nil {
1312 logger.Warnw(ctx, "discovery-indication-failed", log.Fields{"error": err})
1313 }
1314 logger.Infow(ctx, "onu-child-device-added",
1315 log.Fields{"onuDevice": onuDevice,
1316 "sn": sn,
1317 "onu-id": onuID,
1318 "device-id": dh.device.Id})
1319 }
1320
1321 onuID = onuDevice.ProxyAddress.OnuId
1322 logger.Debugw(ctx, "onu-discovery-indication-key-create",
1323 log.Fields{"onu-id": onuID,
1324 "intfId": onuDiscInd.GetIntfId(),
1325 "sn": sn})
1326 onuKey := dh.formOnuKey(onuDiscInd.GetIntfId(), onuID)
1327
1328 onuDev := NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuID, onuDiscInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false)
1329 dh.onus.Store(onuKey, onuDev)
1330 logger.Debugw(ctx, "new-onu-device-discovered",
1331 log.Fields{"onu": onuDev,
1332 "sn": sn})
1333
1334 if err := dh.coreProxy.DeviceStateUpdate(ctx, onuDevice.Id, common.ConnectStatus_REACHABLE, common.OperStatus_DISCOVERED); err != nil {
1335 return olterrors.NewErrAdapter("failed-to-update-device-state", log.Fields{
1336 "device-id": onuDevice.Id,
1337 "serial-number": sn}, err)
1338 }
1339 logger.Infow(ctx, "onu-discovered-reachable", log.Fields{"device-id": onuDevice.Id, "sn": sn})
1340 if err := dh.activateONU(ctx, onuDiscInd.IntfId, int64(onuID), onuDiscInd.SerialNumber, sn); err != nil {
1341 return olterrors.NewErrAdapter("onu-activation-failed", log.Fields{
1342 "device-id": onuDevice.Id,
1343 "serial-number": sn}, err)
1344 }
1345
1346 device := FindL2oamDevice(macAddress)
1347 if device != nil {
1348 logger.Debug(ctx, fmt.Sprintf("onuDiscIndication2() onu.initialize() called. deviceId=%s, onuId=%d", onuDevice.Id, onuID))
1349 onu := device.(*L2oamOnuDevice)
1350 onu.update(macAddress, onuDevice.Id, onuID)
1351 onu.setActiveState(ctx, true)
1352 }
1353 onuDevice.MacAddress = macAddress
1354 onuDevice.Vendor = string(onuDiscInd.SerialNumber.VendorSpecific)
1355 if err := dh.coreProxy.DeviceUpdate(log.WithSpanFromContext(context.TODO(), ctx), onuDevice); err != nil {
1356 logger.Error(ctx, fmt.Sprintf("onuDiscIndication2() DeviceUpdate() failed. macAddress=%s, deviceId=%s", macAddress, onuDevice.Id))
1357 }
1358
1359 return nil
1360}
1361func (dh *DeviceHandler) onuIndication(ctx context.Context, onuInd *oop.OnuIndication) error {
1362 serialNumber := dh.stringifySerialNumber(onuInd.SerialNumber)
1363
1364 kwargs := make(map[string]interface{})
1365 ponPort := IntfIDToPortNo(onuInd.GetIntfId(), voltha.Port_PON_OLT)
1366 var onuDevice *voltha.Device
1367 var err error
1368 foundInCache := false
1369 logger.Debugw(ctx, "onu-indication-key-create",
1370 log.Fields{"onuId": onuInd.OnuId,
1371 "intfId": onuInd.GetIntfId(),
1372 "device-id": dh.device.Id})
1373 onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.OnuId)
1374
1375 errFields := log.Fields{"device-id": dh.device.Id}
1376
1377 if onuInCache, ok := dh.onus.Load(onuKey); ok {
1378
1379 //If ONU id is discovered before then use GetDevice to get onuDevice because it is cheaper.
1380 foundInCache = true
1381 errFields["onu-id"] = onuInCache.(*OnuDevice).deviceID
1382 onuDevice, err = dh.coreProxy.GetDevice(ctx, dh.device.Id, onuInCache.(*OnuDevice).deviceID)
1383 } else {
1384 //If ONU not found in adapter cache then we have to use GetChildDevice to get onuDevice
1385 if serialNumber != "" {
1386 kwargs["serial_number"] = serialNumber
1387 errFields["serial-number"] = serialNumber
1388 } else {
1389 kwargs["onu_id"] = onuInd.OnuId
1390 kwargs["parent_port_no"] = ponPort
1391 errFields["onu-id"] = onuInd.OnuId
1392 errFields["parent-port-no"] = ponPort
1393 }
1394 onuDevice, err = dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
1395 }
1396
1397 if err != nil || onuDevice == nil {
1398 return olterrors.NewErrNotFound("onu-device", errFields, err)
1399 }
1400
1401 if onuDevice.ParentPortNo != ponPort {
1402 logger.Warnw(ctx, "onu-is-on-a-different-intf-id-now", log.Fields{
1403 "previousIntfId": onuDevice.ParentPortNo,
1404 "currentIntfId": ponPort})
1405 }
1406
1407 if onuDevice.ProxyAddress.OnuId != onuInd.OnuId {
1408 logger.Warnw(ctx, "onu-id-mismatch-possible-if-voltha-and-olt-rebooted", log.Fields{
1409 "expected-onu-id": onuDevice.ProxyAddress.OnuId,
1410 "received-onu-id": onuInd.OnuId,
1411 "device-id": dh.device.Id})
1412 }
1413 if !foundInCache {
1414 onuKey := dh.formOnuKey(onuInd.GetIntfId(), onuInd.GetOnuId())
1415
1416 dh.onus.Store(onuKey, NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuInd.GetOnuId(), onuInd.GetIntfId(), onuDevice.ProxyAddress.DeviceId, false))
1417
1418 }
1419 if err := dh.updateOnuStates(ctx, onuDevice, onuInd); err != nil {
1420 return olterrors.NewErrCommunication("state-update-failed", errFields, err)
1421 }
1422 return nil
1423}
1424
1425func (dh *DeviceHandler) updateOnuStates(ctx context.Context, onuDevice *voltha.Device, onuInd *oop.OnuIndication) error {
1426 logger.Debugw(ctx, "onu-indication-for-state", log.Fields{"onuIndication": onuInd, "device-id": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
1427 if onuInd.AdminState == "down" || onuInd.OperState == "down" {
1428 // The ONU has gone admin_state "down" or oper_state "down" - we expect the ONU to send discovery again
1429 // The ONU admin_state is "up" while "oper_state" is down in cases where ONU activation fails. In this case
1430 // the ONU sends Discovery again.
1431 dh.discOnus.Delete(onuDevice.SerialNumber)
1432 // Tests have shown that we sometimes get OperState as NOT down even if AdminState is down, forcing it
1433 if onuInd.OperState != "down" {
1434 logger.Warnw(ctx, "onu-admin-state-down", log.Fields{"operState": onuInd.OperState})
1435 onuInd.OperState = "down"
1436 }
1437 }
1438
1439 switch onuInd.OperState {
1440 case "down":
1441 logger.Debugw(ctx, "sending-interadapter-onu-indication", log.Fields{"onuIndication": onuInd, "device-id": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
1442 // TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
1443 err := dh.AdapterProxy.SendInterAdapterMessage(ctx, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
1444 "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
1445 if err != nil {
1446 return olterrors.NewErrCommunication("inter-adapter-send-failed", log.Fields{
1447 "onu-indicator": onuInd,
1448 "source": "openolt",
1449 "device-type": onuDevice.Type,
1450 "device-id": onuDevice.Id}, err)
1451 }
1452 case "up":
1453 logger.Debugw(ctx, "sending-interadapter-onu-indication", log.Fields{"onuIndication": onuInd, "device-id": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
1454 // TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
1455 err := dh.AdapterProxy.SendInterAdapterMessage(ctx, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
1456 "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
1457 if err != nil {
1458 return olterrors.NewErrCommunication("inter-adapter-send-failed", log.Fields{
1459 "onu-indicator": onuInd,
1460 "source": "openolt",
1461 "device-type": onuDevice.Type,
1462 "device-id": onuDevice.Id}, err)
1463 }
1464 default:
1465 return olterrors.NewErrInvalidValue(log.Fields{"oper-state": onuInd.OperState}, nil)
1466 }
1467 return nil
1468}
1469
1470func (dh *DeviceHandler) stringifySerialNumber(serialNum *oop.SerialNumber) string {
1471 if serialNum != nil {
1472 return string(serialNum.VendorId) + dh.stringifyVendorSpecific(serialNum.VendorSpecific)
1473 }
1474 return ""
1475}
1476func (dh *DeviceHandler) deStringifySerialNumber(serialNum string) (*oop.SerialNumber, error) {
1477 decodedStr, err := hex.DecodeString(serialNum[4:])
1478 if err != nil {
1479 return nil, err
1480 }
1481 return &oop.SerialNumber{
1482 VendorId: []byte(serialNum[:4]),
1483 VendorSpecific: []byte(decodedStr),
1484 }, nil
1485}
1486
1487func (dh *DeviceHandler) stringifyVendorSpecific(vendorSpecific []byte) string {
1488 tmp := fmt.Sprintf("%x", (uint32(vendorSpecific[0])>>4)&0x0f) +
1489 fmt.Sprintf("%x", uint32(vendorSpecific[0]&0x0f)) +
1490 fmt.Sprintf("%x", (uint32(vendorSpecific[1])>>4)&0x0f) +
1491 fmt.Sprintf("%x", (uint32(vendorSpecific[1]))&0x0f) +
1492 fmt.Sprintf("%x", (uint32(vendorSpecific[2])>>4)&0x0f) +
1493 fmt.Sprintf("%x", (uint32(vendorSpecific[2]))&0x0f) +
1494 fmt.Sprintf("%x", (uint32(vendorSpecific[3])>>4)&0x0f) +
1495 fmt.Sprintf("%x", (uint32(vendorSpecific[3]))&0x0f)
1496 return tmp
1497}
1498
1499//UpdateFlowsBulk upates the bulk flow
1500func (dh *DeviceHandler) UpdateFlowsBulk() error {
1501 return olterrors.ErrNotImplemented
1502}
1503
1504//GetChildDevice returns the child device for given parent port and onu id
1505func (dh *DeviceHandler) GetChildDevice(ctx context.Context, parentPort, onuID uint32) (*voltha.Device, error) {
1506 logger.Debugw(ctx, "getchilddevice",
1507 log.Fields{"pon-port": parentPort,
1508 "onu-id": onuID,
1509 "device-id": dh.device.Id})
1510 kwargs := make(map[string]interface{})
1511 kwargs["onu_id"] = onuID
1512 kwargs["parent_port_no"] = parentPort
1513 onuDevice, err := dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
1514 if err != nil {
1515 return nil, olterrors.NewErrNotFound("onu-device", log.Fields{
1516 "intf-id": parentPort,
1517 "onu-id": onuID}, err)
1518 }
1519 logger.Debugw(ctx, "successfully-received-child-device-from-core", log.Fields{"child-device-id": onuDevice.Id, "child-device-sn": onuDevice.SerialNumber})
1520 return onuDevice, nil
1521}
1522
1523// SendPacketInToCore sends packet-in to core
1524// For this, it calls SendPacketIn of the core-proxy which uses a device specific topic to send the request.
1525// The adapter handling the device creates a device specific topic
1526func (dh *DeviceHandler) SendPacketInToCore(ctx context.Context, logicalPort uint32, packetPayload []byte) error {
1527 if logger.V(log.DebugLevel) {
1528 logger.Debugw(ctx, "send-packet-in-to-core", log.Fields{
1529 "port": logicalPort,
1530 "packet": hex.EncodeToString(packetPayload),
1531 "device-id": dh.device.Id,
1532 })
1533 }
1534 if err := dh.coreProxy.SendPacketIn(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, logicalPort, packetPayload); err != nil {
1535 return olterrors.NewErrCommunication("packet-send-failed", log.Fields{
1536 "source": "adapter",
1537 "destination": "core",
1538 "device-id": dh.device.Id,
1539 "logical-port": logicalPort,
1540 "packet": hex.EncodeToString(packetPayload)}, err)
1541 }
1542 if logger.V(log.DebugLevel) {
1543 logger.Debugw(ctx, "sent-packet-in-to-core-successfully", log.Fields{
1544 "packet": hex.EncodeToString(packetPayload),
1545 "device-id": dh.device.Id,
1546 })
1547 }
1548 return nil
1549}
1550
1551// UpdatePmConfig updates the pm metrics.
1552func (dh *DeviceHandler) UpdatePmConfig(ctx context.Context, pmConfigs *voltha.PmConfigs) {
1553 logger.Infow(ctx, "update-pm-configs", log.Fields{"device-id": dh.device.Id, "pm-configs": pmConfigs})
1554
1555 if pmConfigs.DefaultFreq != dh.metrics.ToPmConfigs().DefaultFreq {
1556 dh.metrics.UpdateFrequency(pmConfigs.DefaultFreq)
1557 logger.Debugf(ctx, "frequency-updated")
1558 }
1559
1560 if !pmConfigs.Grouped {
1561 metrics := dh.metrics.GetSubscriberMetrics()
1562 for _, m := range pmConfigs.Metrics {
1563 metrics[m.Name].Enabled = m.Enabled
1564
1565 }
1566 }
1567}
1568
1569//UpdateFlowsIncrementally updates the device flow
1570func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
1571 logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
1572
1573 var errorsList []error
1574
1575 if flows != nil {
1576 for _, flow := range flows.ToRemove.Items {
1577 dh.incrementActiveFlowRemoveCount(ctx, flow)
1578
1579 logger.Debugw(ctx, "removing-flow",
1580 log.Fields{"device-id": device.Id,
1581 "flowToRemove": flow})
1582 err := dh.flowMgr.RemoveFlow(ctx, flow)
1583 if err != nil {
1584 errorsList = append(errorsList, err)
1585 }
1586
1587 dh.decrementActiveFlowRemoveCount(ctx, flow)
1588 }
1589
1590 for _, flow := range flows.ToAdd.Items {
1591 logger.Debugw(ctx, "adding-flow",
1592 log.Fields{"device-id": device.Id,
1593 "flowToAdd": flow})
1594 // If there are active Flow Remove in progress for a given subscriber, wait until it completes
1595 dh.waitForFlowRemoveToFinish(ctx, flow)
1596 err := dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
1597 if err != nil {
1598 errorsList = append(errorsList, err)
1599 }
1600 }
1601 }
1602
1603 if groups != nil {
1604 for _, group := range groups.ToAdd.Items {
1605 err := dh.flowMgr.AddGroup(ctx, group)
1606 if err != nil {
1607 errorsList = append(errorsList, err)
1608 }
1609 }
1610 for _, group := range groups.ToUpdate.Items {
1611 err := dh.flowMgr.ModifyGroup(ctx, group)
1612 if err != nil {
1613 errorsList = append(errorsList, err)
1614 }
1615 }
1616 for _, group := range groups.ToRemove.Items {
1617 err := dh.flowMgr.DeleteGroup(ctx, group)
1618 if err != nil {
1619 errorsList = append(errorsList, err)
1620 }
1621 }
1622 }
1623 if len(errorsList) > 0 {
1624 return fmt.Errorf("errors-installing-flows-groups, errors:%v", errorsList)
1625 }
1626 logger.Debugw(ctx, "updated-flows-incrementally-successfully", log.Fields{"device-id": dh.device.Id})
1627 return nil
1628}
1629
1630//DisableDevice disables the given device
1631//It marks the following for the given device:
1632//Device-Handler Admin-State : down
1633//Device Port-State: UNKNOWN
1634//Device Oper-State: UNKNOWN
1635func (dh *DeviceHandler) DisableDevice(ctx context.Context, device *voltha.Device) error {
1636 /* On device disable ,admin state update has to be done prior sending request to agent since
1637 the indication thread may processes invalid indications of ONU and OLT*/
1638 if dh.Client != nil {
1639 if _, err := dh.Client.DisableOlt(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
1640 if e, ok := status.FromError(err); ok && e.Code() == codes.Internal {
1641 return olterrors.NewErrAdapter("olt-disable-failed", log.Fields{"device-id": device.Id}, err)
1642 }
1643 }
1644 }
1645 L2oamDisableOlt(ctx, dh)
1646 logger.Debugw(ctx, "olt-disabled", log.Fields{"device-id": device.Id})
1647 /* Discovered ONUs entries need to be cleared , since on device disable the child devices goes to
1648 UNREACHABLE state which needs to be configured again*/
1649
1650 dh.discOnus = sync.Map{}
1651 dh.onus = sync.Map{}
1652
1653 dh.stopCollector <- true
1654
1655 go dh.notifyChildDevices(ctx, "unreachable")
1656 cloned := proto.Clone(device).(*voltha.Device)
1657 dh.device = cloned
1658 if err := dh.coreProxy.PortsStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), cloned.Id, ^uint32(1<<voltha.Port_PON_OLT), voltha.OperStatus_UNKNOWN); err != nil {
1659 return olterrors.NewErrAdapter("ports-state-update-failed", log.Fields{"device-id": device.Id}, err)
1660 }
1661 logger.Debugw(ctx, "disable-device-end", log.Fields{"device-id": device.Id})
1662 return nil
1663}
1664
1665func (dh *DeviceHandler) notifyChildDevices(ctx context.Context, state string) {
1666 onuInd := oop.OnuIndication{}
1667 onuInd.OperState = state
1668 onuDevices, err := dh.coreProxy.GetChildDevices(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id)
1669 if err != nil {
1670 logger.Errorw(ctx, "failed-to-get-child-devices-information", log.Fields{"device-id": dh.device.Id, "error": err})
1671 }
1672 if onuDevices != nil {
1673 for _, onuDevice := range onuDevices.Items {
1674 err := dh.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.TODO(), ctx), &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
1675 "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
1676 if err != nil {
1677 logger.Errorw(ctx, "failed-to-send-inter-adapter-message", log.Fields{"OnuInd": onuInd,
1678 "From Adapter": "openolt", "DeviceType": onuDevice.Type, "device-id": onuDevice.Id})
1679 }
1680
1681 }
1682 }
1683
1684}
1685
1686//ReenableDevice re-enables the olt device after disable
1687//It marks the following for the given device:
1688//Device-Handler Admin-State : up
1689//Device Port-State: ACTIVE
1690//Device Oper-State: ACTIVE
1691func (dh *DeviceHandler) ReenableDevice(ctx context.Context, device *voltha.Device) error {
1692 if _, err := dh.Client.ReenableOlt(log.WithSpanFromContext(context.Background(), ctx), new(oop.Empty)); err != nil {
1693 if e, ok := status.FromError(err); ok && e.Code() == codes.Internal {
1694 return olterrors.NewErrAdapter("olt-reenable-failed", log.Fields{"device-id": dh.device.Id}, err)
1695 }
1696 }
1697 logger.Debug(ctx, "olt-reenabled")
1698
1699
1700 ports, err := dh.coreProxy.ListDevicePorts(ctx, device.Id)
1701 if err != nil {
1702 return olterrors.NewErrAdapter("list-ports-failed", log.Fields{"device": device.Id}, err)
1703 }
1704 if err := dh.disableAdminDownPorts(ctx, ports); err != nil {
1705 return olterrors.NewErrAdapter("port-status-update-failed-after-olt-reenable", log.Fields{"device": device}, err)
1706 }
1707 device.OperStatus = voltha.OperStatus_ACTIVE
1708 dh.device = device
1709
1710 if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), device.Id, device.ConnectStatus, device.OperStatus); err != nil {
1711 return olterrors.NewErrAdapter("state-update-failed", log.Fields{
1712 "device-id": device.Id,
1713 "connect-status": device.ConnectStatus,
1714 "oper-status": device.OperStatus}, err)
1715 }
1716
1717 logger.Debugw(ctx, "reenabledevice-end", log.Fields{"device-id": device.Id})
1718
1719 return nil
1720}
1721
1722func (dh *DeviceHandler) clearUNIData(ctx context.Context, onu *rsrcMgr.OnuGemInfo) error {
1723 var uniID uint32
1724 var err error
1725 for _, port := range onu.UniPorts {
1726 uniID = UniIDFromPortNum(uint32(port))
1727 logger.Debugw(ctx, "clearing-resource-data-for-uni-port", log.Fields{"port": port, "uni-id": uniID})
1728 /* Delete tech-profile instance from the KV store */
1729 if err = dh.flowMgr.DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
1730 logger.Debugw(ctx, "failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
1731 }
1732 logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
1733 flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID))
1734 for _, flowID := range flowIDs {
1735 dh.resourceMgr.FreeFlowID(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
1736 }
1737 tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
1738 for _, tpID := range tpIDList {
1739 if err = dh.resourceMgr.RemoveMeterIDForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
1740 logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
1741 }
1742 logger.Debugw(ctx, "removed-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
1743 if err = dh.resourceMgr.RemoveMeterIDForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
1744 logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
1745 }
1746 logger.Debugw(ctx, "removed-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
1747 }
1748 dh.resourceMgr.FreePONResourcesForONU(ctx, onu.IntfID, onu.OnuID, uniID)
1749 if err = dh.resourceMgr.RemoveTechProfileIDsForOnu(ctx, onu.IntfID, onu.OnuID, uniID); err != nil {
1750 logger.Debugw(ctx, "failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
1751 }
1752 logger.Debugw(ctx, "removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
1753 if err = dh.resourceMgr.DelGemPortPktInOfAllServices(ctx, onu.IntfID, onu.OnuID, uint32(port)); err != nil {
1754 logger.Debugw(ctx, "failed-to-remove-gemport-pkt-in", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
1755 }
1756 }
1757 return nil
1758}
1759
1760func (dh *DeviceHandler) clearNNIData(ctx context.Context) error {
1761 nniUniID := -1
1762 nniOnuID := -1
1763
1764 if dh.resourceMgr == nil {
1765 return olterrors.NewErrNotFound("resource-manager", log.Fields{"device-id": dh.device.Id}, nil)
1766 }
1767 nni, err := dh.resourceMgr.GetNNIFromKVStore(ctx)
1768 if err != nil {
1769 return olterrors.NewErrPersistence("get", "nni", 0, nil, err)
1770 }
1771 logger.Debugw(ctx, "nni-", log.Fields{"nni": nni})
1772 for _, nniIntfID := range nni {
1773 flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(ctx, uint32(nniIntfID), int32(nniOnuID), int32(nniUniID))
1774 logger.Debugw(ctx, "current-flow-ids-for-nni", log.Fields{"flow-ids": flowIDs})
1775 for _, flowID := range flowIDs {
1776 dh.resourceMgr.FreeFlowID(ctx, uint32(nniIntfID), -1, -1, uint32(flowID))
1777 }
1778 dh.resourceMgr.RemoveResourceMap(ctx, nniIntfID, int32(nniOnuID), int32(nniUniID))
1779 }
1780 if err = dh.resourceMgr.DelNNiFromKVStore(ctx); err != nil {
1781 return olterrors.NewErrPersistence("clear", "nni", 0, nil, err)
1782 }
1783 return nil
1784}
1785
1786// DeleteDevice deletes the device instance from openolt handler array. Also clears allocated resource manager resources. Also reboots the OLT hardware!
1787func (dh *DeviceHandler) DeleteDevice(ctx context.Context, device *voltha.Device) error {
1788 logger.Debug(ctx, "function-entry-delete-device")
1789 /* Clear the KV store data associated with the all the UNI ports
1790 This clears up flow data and also resource map data for various
1791 other pon resources like alloc_id and gemport_id
1792 */
1793 L2oamDeleteOlt(ctx, dh)
1794 go dh.cleanupDeviceResources(ctx)
1795 logger.Debug(ctx, "removed-device-from-Resource-manager-KV-store")
1796 dh.stopCollector <- true
1797 dh.stopHeartbeatCheck <- true
1798 if dh.Client != nil {
1799 if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
1800 return olterrors.NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
1801 }
1802 }
1803 cloned := proto.Clone(device).(*voltha.Device)
1804 cloned.OperStatus = voltha.OperStatus_UNKNOWN
1805 cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
1806 if err := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
1807 return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{
1808 "device-id": device.Id,
1809 "connect-status": cloned.ConnectStatus,
1810 "oper-status": cloned.OperStatus}, err).Log()
1811 }
1812 return nil
1813}
1814func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) {
1815
1816 if dh.resourceMgr != nil {
1817 noOfPonPorts := dh.resourceMgr.DevInfo.GetPonPorts()
1818 var ponPort uint32
1819 for ponPort = 0; ponPort < noOfPonPorts; ponPort++ {
1820 var onuGemData []rsrcMgr.OnuGemInfo
1821 err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ctx, ponPort, &onuGemData)
1822 if err != nil {
1823 _ = olterrors.NewErrNotFound("onu", log.Fields{
1824 "device-id": dh.device.Id,
1825 "pon-port": ponPort}, err).Log()
1826 }
1827 for _, onu := range onuGemData {
1828 onuID := make([]uint32, 1)
1829 logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
1830 if err = dh.clearUNIData(ctx, &onu); err != nil {
1831 logger.Errorw(ctx, "failed-to-clear-data-for-onu", log.Fields{"onu-device": onu})
1832 }
1833 // Clear flowids for gem cache.
1834 for _, gem := range onu.GemPorts {
1835 dh.resourceMgr.DeleteFlowIDsForGem(ctx, ponPort, gem)
1836 }
1837 onuID[0] = onu.OnuID
1838 dh.resourceMgr.FreeonuID(ctx, ponPort, onuID)
1839 }
1840 dh.resourceMgr.DeleteIntfIDGempMapPath(ctx, ponPort)
1841 onuGemData = nil
1842 err = dh.resourceMgr.DelOnuGemInfoForIntf(ctx, ponPort)
1843 if err != nil {
1844 logger.Errorw(ctx, "failed-to-update-onugem-info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
1845 }
1846 }
1847 /* Clear the flows from KV store associated with NNI port.
1848 There are mostly trap rules from NNI port (like LLDP)
1849 */
1850 if err := dh.clearNNIData(ctx); err != nil {
1851 logger.Errorw(ctx, "failed-to-clear-data-for-NNI-port", log.Fields{"device-id": dh.device.Id})
1852 }
1853
1854 /* Clear the resource pool for each PON port in the background */
1855 go func() {
1856 if err := dh.resourceMgr.Delete(ctx); err != nil {
1857 logger.Debug(ctx, err)
1858 }
1859 }()
1860 }
1861
1862 /*Delete ONU map for the device*/
1863 dh.onus.Range(func(key interface{}, value interface{}) bool {
1864 dh.onus.Delete(key)
1865 return true
1866 })
1867
1868 /*Delete discovered ONU map for the device*/
1869 dh.discOnus.Range(func(key interface{}, value interface{}) bool {
1870 dh.discOnus.Delete(key)
1871 return true
1872 })
1873}
1874
1875//RebootDevice reboots the given device
1876func (dh *DeviceHandler) RebootDevice(ctx context.Context, device *voltha.Device) error {
1877 L2oamRebootDevice(ctx, dh, device)
1878 logger.Debugw(ctx, "rebooted-device-successfully", log.Fields{"device-id": device.Id})
1879 return nil
1880}
1881
1882func (dh *DeviceHandler) handlePacketIndication(ctx context.Context, packetIn *oop.PacketIndication) error {
1883 if logger.V(log.DebugLevel) {
1884 logger.Debugw(ctx, "received-packet-in", log.Fields{
1885 "packet-indication": *packetIn,
1886 "device-id": dh.device.Id,
1887 "packet": hex.EncodeToString(packetIn.Pkt),
1888 })
1889 }
1890 logicalPortNum, err := dh.flowMgr.GetLogicalPortFromPacketIn(ctx, packetIn)
1891 if err != nil {
1892 return olterrors.NewErrNotFound("logical-port", log.Fields{"packet": hex.EncodeToString(packetIn.Pkt)}, err)
1893 }
1894 if logger.V(log.DebugLevel) {
1895 logger.Debugw(ctx, "sending-packet-in-to-core", log.Fields{
1896 "logical-port-num": logicalPortNum,
1897 "device-id": dh.device.Id,
1898 "packet": hex.EncodeToString(packetIn.Pkt),
1899 })
1900 }
1901
1902 if err := dh.coreProxy.SendPacketIn(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, logicalPortNum, packetIn.Pkt); err != nil {
1903 return olterrors.NewErrCommunication("send-packet-in", log.Fields{
1904 "destination": "core",
1905 "source": dh.device.Type,
1906 "device-id": dh.device.Id,
1907 "packet": hex.EncodeToString(packetIn.Pkt),
1908 }, err)
1909 }
1910
1911 if logger.V(log.DebugLevel) {
1912 logger.Debugw(ctx, "success-sending-packet-in-to-core!", log.Fields{
1913 "packet": hex.EncodeToString(packetIn.Pkt),
1914 "device-id": dh.device.Id,
1915 })
1916 }
1917 return nil
1918}
1919
1920// PacketOut sends packet-out from VOLTHA to OLT on the egress port provided
1921func (dh *DeviceHandler) PacketOut(ctx context.Context, egressPortNo int, packet *of.OfpPacketOut) error {
1922 if logger.V(log.DebugLevel) {
1923 logger.Debugw(ctx, "incoming-packet-out", log.Fields{
1924 "device-id": dh.device.Id,
1925 "egress-port-no": egressPortNo,
1926 "pkt-length": len(packet.Data),
1927 "packet": hex.EncodeToString(packet.Data),
1928 })
1929 }
1930
1931 if disableBBSim {
1932 if vlanEnable {
1933 byteString := string(packet.Data)
1934 searchByteString := string([]byte{0x88, 0x8e, 0x03, 0x00, 0x00, 0x04, 0x03})
1935 //if strings.Index(byteString, searchByteString) != -1 {
1936 if strings.Contains(byteString, searchByteString) {
1937 device := FindL2oamDevice(hex.EncodeToString(packet.Data[:6]))
1938 if onu, ok := device.(*L2oamOnuDevice); ok {
1939 onu.Base.EapFlag = true
1940 }
1941 }
1942
1943 // add s-vlan tag
1944 sendPacket := make([]byte, len(packet.Data)+4)
1945 copy(sendPacket, packet.Data[:12])
1946 copy(sendPacket[12:], svlanTagAuth)
1947 copy(sendPacket[16:], packet.Data[12:])
1948 GetL2oamHandle().send(sendPacket)
1949 } else {
1950 GetL2oamHandle().send(packet.Data)
1951 }
1952 return nil
1953 }
1954
1955 egressPortType := IntfIDToPortTypeName(uint32(egressPortNo))
1956 if egressPortType == voltha.Port_ETHERNET_UNI {
1957 outerEthType := (uint16(packet.Data[12]) << 8) | uint16(packet.Data[13])
1958 innerEthType := (uint16(packet.Data[16]) << 8) | uint16(packet.Data[17])
1959 if outerEthType == 0x8942 || outerEthType == 0x88cc {
1960 // Do not packet-out lldp packets on uni port.
1961 // ONOS has no clue about uni/nni ports, it just packets out on all
1962 // available ports on the Logical Switch. It should not be interested
1963 // in the UNI links.
1964 logger.Debugw(ctx, "dropping-lldp-packet-out-on-uni", log.Fields{
1965 "device-id": dh.device.Id,
1966 })
1967 return nil
1968 }
1969 if outerEthType == 0x88a8 || outerEthType == 0x8100 {
1970 if innerEthType == 0x8100 {
1971 // q-in-q 802.1ad or 802.1q double tagged packet.
1972 // slice out the outer tag.
1973 packet.Data = append(packet.Data[:12], packet.Data[16:]...)
1974 if logger.V(log.DebugLevel) {
1975 logger.Debugw(ctx, "packet-now-single-tagged", log.Fields{
1976 "packet-data": hex.EncodeToString(packet.Data),
1977 "device-id": dh.device.Id,
1978 })
1979 }
1980 }
1981 }
1982 intfID := IntfIDFromUniPortNum(uint32(egressPortNo))
1983 onuID := OnuIDFromPortNum(uint32(egressPortNo))
1984 uniID := UniIDFromPortNum(uint32(egressPortNo))
1985
1986 gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo), packet.Data)
1987 if err != nil {
1988 // In this case the openolt agent will receive the gemPortID as 0.
1989 // The agent tries to retrieve the gemPortID in this case.
1990 // This may not always succeed at the agent and packetOut may fail.
1991 logger.Errorw(ctx, "failed-to-retrieve-gemport-id-for-packet-out", log.Fields{
1992 "intf-id": intfID,
1993 "onu-id": onuID,
1994 "uni-id": uniID,
1995 "packet": hex.EncodeToString(packet.Data),
1996 "device-id": dh.device.Id,
1997 })
1998 }
1999
2000 onuPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, PortNo: uint32(egressPortNo), GemportId: gemPortID, Pkt: packet.Data}
2001
2002 if logger.V(log.DebugLevel) {
2003 logger.Debugw(ctx, "sending-packet-to-onu", log.Fields{
2004 "egress-port-no": egressPortNo,
2005 "intf-id": intfID,
2006 "onu-id": onuID,
2007 "uni-id": uniID,
2008 "gem-port-id": gemPortID,
2009 "packet": hex.EncodeToString(packet.Data),
2010 "device-id": dh.device.Id,
2011 })
2012 }
2013
2014 if _, err := dh.Client.OnuPacketOut(ctx, &onuPkt); err != nil {
2015 return olterrors.NewErrCommunication("packet-out-send", log.Fields{
2016 "source": "adapter",
2017 "destination": "onu",
2018 "egress-port-number": egressPortNo,
2019 "intf-id": intfID,
2020 "oni-id": onuID,
2021 "uni-id": uniID,
2022 "gem-port-id": gemPortID,
2023 "packet": hex.EncodeToString(packet.Data),
2024 "device-id": dh.device.Id,
2025 }, err)
2026 }
2027 } else if egressPortType == voltha.Port_ETHERNET_NNI {
2028 nniIntfID, err := IntfIDFromNniPortNum(ctx, uint32(egressPortNo))
2029 if err != nil {
2030 return olterrors.NewErrInvalidValue(log.Fields{
2031 "egress-nni-port": egressPortNo,
2032 "device-id": dh.device.Id,
2033 }, err)
2034 }
2035 uplinkPkt := oop.UplinkPacket{IntfId: nniIntfID, Pkt: packet.Data}
2036
2037 if logger.V(log.DebugLevel) {
2038 logger.Debugw(ctx, "sending-packet-to-nni", log.Fields{
2039 "uplink-pkt": uplinkPkt,
2040 "packet": hex.EncodeToString(packet.Data),
2041 "device-id": dh.device.Id,
2042 })
2043 }
2044
2045 if _, err := dh.Client.UplinkPacketOut(ctx, &uplinkPkt); err != nil {
2046 return olterrors.NewErrCommunication("packet-out-to-nni", log.Fields{
2047 "packet": hex.EncodeToString(packet.Data),
2048 "device-id": dh.device.Id,
2049 }, err)
2050 }
2051 } else {
2052 logger.Warnw(ctx, "packet-out-to-this-interface-type-not-implemented", log.Fields{
2053 "egress-port-no": egressPortNo,
2054 "egressPortType": egressPortType,
2055 "packet": hex.EncodeToString(packet.Data),
2056 "device-id": dh.device.Id,
2057 })
2058 }
2059 return nil
2060}
2061
2062func (dh *DeviceHandler) formOnuKey(intfID, onuID uint32) string {
2063 return "" + strconv.Itoa(int(intfID)) + "." + strconv.Itoa(int(onuID))
2064}
2065
2066func startHeartbeatCheck(ctx context.Context, dh *DeviceHandler) {
2067
2068 var timerCheck *time.Timer
2069
2070 for {
2071 heartbeatTimer := time.NewTimer(dh.openOLT.HeartbeatCheckInterval)
2072 select {
2073 case <-heartbeatTimer.C:
2074 ctxWithTimeout, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.openOLT.GrpcTimeoutInterval)
2075 if heartBeat, err := dh.Client.HeartbeatCheck(ctxWithTimeout, new(oop.Empty)); err != nil {
2076 logger.Warnw(ctx, "hearbeat-failed", log.Fields{"device-id": dh.device.Id})
2077 if timerCheck == nil {
2078 // start a after func, when expired will update the state to the core
2079 timerCheck = time.AfterFunc(dh.openOLT.HeartbeatFailReportInterval, func() { dh.updateStateUnreachable(ctx) })
2080 }
2081 } else {
2082 if timerCheck != nil {
2083 if timerCheck.Stop() {
2084 logger.Debugw(ctx, "got-hearbeat-within-timeout", log.Fields{"device-id": dh.device.Id})
2085 }
2086 timerCheck = nil
2087 }
2088 logger.Debugw(ctx, "hearbeat",
2089 log.Fields{"signature": heartBeat,
2090 "device-id": dh.device.Id})
2091 }
2092 cancel()
2093 case <-dh.stopHeartbeatCheck:
2094 logger.Debugw(ctx, "stopping-heart-beat-check", log.Fields{"device-id": dh.device.Id})
2095 return
2096 }
2097 }
2098}
2099
2100func (dh *DeviceHandler) updateStateUnreachable(ctx context.Context) {
2101 device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
2102 if err != nil || device == nil {
2103 _ = olterrors.NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err).Log()
2104 }
2105
2106 if device.ConnectStatus == voltha.ConnectStatus_REACHABLE {
2107 if err = dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
2108 _ = olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
2109 }
2110 if err = dh.coreProxy.PortsStateUpdate(ctx, dh.device.Id, 0, voltha.OperStatus_UNKNOWN); err != nil {
2111 _ = olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
2112 }
2113 go dh.cleanupDeviceResources(ctx)
2114
2115 dh.lockDevice.RLock()
2116 // Stop the read indication only if it the routine is active
2117 // The read indication would have already stopped due to failure on the gRPC stream following OLT going unreachable
2118 // Sending message on the 'stopIndication' channel again will cause the readIndication routine to immediately stop
2119 // on next execution of the readIndication routine.
2120 if dh.isReadIndicationRoutineActive {
2121 dh.stopIndications <- true
2122 }
2123 dh.lockDevice.RUnlock()
2124
2125 dh.transitionMap.Handle(ctx, DeviceInit)
2126
2127 }
2128}
2129
2130// EnablePort to enable Pon interface
2131func (dh *DeviceHandler) EnablePort(ctx context.Context, port *voltha.Port) error {
2132 logger.Debugw(ctx, "enable-port", log.Fields{"Device": dh.device, "port": port})
2133 return dh.modifyPhyPort(ctx, port, true)
2134}
2135
2136// DisablePort to disable pon interface
2137func (dh *DeviceHandler) DisablePort(ctx context.Context, port *voltha.Port) error {
2138 logger.Debugw(ctx, "disable-port", log.Fields{"Device": dh.device, "port": port})
2139 return dh.modifyPhyPort(ctx, port, false)
2140}
2141
2142//modifyPhyPort is common function to enable and disable the port. parm :enablePort, true to enablePort and false to disablePort.
2143func (dh *DeviceHandler) modifyPhyPort(ctx context.Context, port *voltha.Port, enablePort bool) error {
2144 logger.Infow(ctx, "modifyPhyPort", log.Fields{"port": port, "Enable": enablePort, "device-id": dh.device.Id})
2145 if port.GetType() == voltha.Port_ETHERNET_NNI {
2146 // Bug is opened for VOL-2505 to support NNI disable feature.
2147 logger.Infow(ctx, "voltha-supports-single-nni-hence-disable-of-nni-not-allowed",
2148 log.Fields{"device": dh.device, "port": port})
2149 return olterrors.NewErrAdapter("illegal-port-request", log.Fields{
2150 "port-type": port.GetType,
2151 "enable-state": enablePort}, nil)
2152 }
2153 ponID := PortNoToIntfID(port.GetPortNo(), voltha.Port_PON_OLT)
2154 ponIntf := &oop.Interface{IntfId: ponID}
2155 var operStatus voltha.OperStatus_Types
2156 if enablePort {
2157 operStatus = voltha.OperStatus_ACTIVE
2158 out, err := dh.Client.EnablePonIf(ctx, ponIntf)
2159
2160 if err != nil {
2161 return olterrors.NewErrAdapter("pon-port-enable-failed", log.Fields{
2162 "device-id": dh.device.Id,
2163 "port": port}, err)
2164 }
2165 // updating interface local cache for collecting stats
2166 dh.activePorts.Store(ponID, true)
2167 logger.Infow(ctx, "enabled-pon-port", log.Fields{"out": out, "device-id": dh.device, "Port": port})
2168 } else {
2169 operStatus = voltha.OperStatus_UNKNOWN
2170 out, err := dh.Client.DisablePonIf(ctx, ponIntf)
2171 if err != nil {
2172 return olterrors.NewErrAdapter("pon-port-disable-failed", log.Fields{
2173 "device-id": dh.device.Id,
2174 "port": port}, err)
2175 }
2176 // updating interface local cache for collecting stats
2177 dh.activePorts.Store(ponID, false)
2178 logger.Infow(ctx, "disabled-pon-port", log.Fields{"out": out, "device-id": dh.device, "Port": port})
2179 }
2180 if err := dh.coreProxy.PortStateUpdate(ctx, dh.device.Id, voltha.Port_PON_OLT, port.PortNo, operStatus); err != nil {
2181 return olterrors.NewErrAdapter("port-state-update-failed", log.Fields{
2182 "device-id": dh.device.Id,
2183 "port": port.PortNo}, err)
2184 }
2185 return nil
2186}
2187
2188//disableAdminDownPorts disables the ports, if the corresponding port Adminstate is disabled on reboot and Renable device.
2189func (dh *DeviceHandler) disableAdminDownPorts(ctx context.Context, ports []*voltha.Port) error {
2190 for _, port := range ports {
2191 if port.AdminState == common.AdminState_DISABLED {
2192 if err := dh.DisablePort(ctx, port); err != nil {
2193 return olterrors.NewErrAdapter("port-disable-failed", log.Fields{
2194 "device-id": dh.device.Id,
2195 "port": port}, err)
2196 }
2197 }
2198 }
2199 return nil
2200}
2201
2202//populateActivePorts to populate activePorts map
2203func (dh *DeviceHandler) populateActivePorts(ctx context.Context, ports []*voltha.Port) {
2204 logger.Infow(ctx, "populateActivePorts", log.Fields{"device-id": dh.device.Id})
2205 for _, port := range ports {
2206 if port.Type == voltha.Port_ETHERNET_NNI {
2207 if port.OperStatus == voltha.OperStatus_ACTIVE {
2208 dh.activePorts.Store(PortNoToIntfID(port.PortNo, voltha.Port_ETHERNET_NNI), true)
2209 } else {
2210 dh.activePorts.Store(PortNoToIntfID(port.PortNo, voltha.Port_ETHERNET_NNI), false)
2211 }
2212 }
2213 if port.Type == voltha.Port_PON_OLT {
2214 if port.OperStatus == voltha.OperStatus_ACTIVE {
2215 dh.activePorts.Store(PortNoToIntfID(port.PortNo, voltha.Port_PON_OLT), true)
2216 } else {
2217 dh.activePorts.Store(PortNoToIntfID(port.PortNo, voltha.Port_PON_OLT), false)
2218 }
2219 }
2220 }
2221}
2222
2223// ChildDeviceLost deletes ONU and clears pon resources related to it.
2224func (dh *DeviceHandler) ChildDeviceLost(ctx context.Context, pPortNo uint32, onuID uint32) error {
2225 logger.Debugw(ctx, "child-device-lost", log.Fields{"pdeviceID": dh.device.Id})
2226 intfID := PortNoToIntfID(pPortNo, voltha.Port_PON_OLT)
2227 onuKey := dh.formOnuKey(intfID, onuID)
2228 onuDevice, ok := dh.onus.Load(onuKey)
2229 if !ok {
2230 return olterrors.NewErrAdapter("failed-to-load-onu-details",
2231 log.Fields{
2232 "device-id": dh.device.Id,
2233 "onu-id": onuID,
2234 "intf-id": intfID}, nil).Log()
2235 }
2236 var sn *oop.SerialNumber
2237 var err error
2238 if sn, err = dh.deStringifySerialNumber(onuDevice.(*OnuDevice).serialNumber); err != nil {
2239 return olterrors.NewErrAdapter("failed-to-destringify-serial-number",
2240 log.Fields{
2241 "devicer-id": dh.device.Id,
2242 "serial-number": onuDevice.(*OnuDevice).serialNumber}, err).Log()
2243 }
2244
2245 for uniID := 0; uniID < MaxUnisPerOnu; uniID++ {
2246 var flowRemoveData pendingFlowRemoveData
2247 key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uint32(uniID)}
2248 dh.lockDevice.RLock()
2249 if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
2250 dh.lockDevice.RUnlock()
2251 continue
2252 }
2253 dh.lockDevice.RUnlock()
2254
2255 logger.Debugw(ctx, "wait-for-flow-remove-complete-before-processing-child-device-lost",
2256 log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
2257 // Wait for all flow removes to finish first
2258 <-flowRemoveData.allFlowsRemoved
2259 logger.Debugw(ctx, "flow-removes-complete-for-subscriber",
2260 log.Fields{"int-id": intfID, "onu-id": onuID, "uni-id": uniID})
2261 }
2262
2263 onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
2264 if _, err := dh.Client.DeleteOnu(log.WithSpanFromContext(context.Background(), ctx), onu); err != nil {
2265 return olterrors.NewErrAdapter("failed-to-delete-onu", log.Fields{
2266 "device-id": dh.device.Id,
2267 "onu-id": onuID}, err).Log()
2268 }
2269 var onuGemData []rsrcMgr.OnuGemInfo
2270 if onuMgr, ok := dh.resourceMgr.ResourceMgrs[intfID]; !ok {
2271 logger.Warnw(ctx, "failed-to-get-resource-manager-for-interface-Id", log.Fields{
2272 "device-id": dh.device.Id,
2273 "intf-id": intfID})
2274 } else {
2275 if err := onuMgr.GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
2276 logger.Warnw(ctx, "failed-to-get-onu-info-for-pon-port", log.Fields{
2277 "device-id": dh.device.Id,
2278 "intf-id": intfID,
2279 "error": err})
2280 } else {
2281 for i, onu := range onuGemData {
2282 if onu.OnuID == onuID && onu.SerialNumber == onuDevice.(*OnuDevice).serialNumber {
2283 logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
2284 if err := dh.clearUNIData(ctx, &onu); err != nil {
2285 logger.Warnw(ctx, "failed-to-clear-uni-data-for-onu", log.Fields{
2286 "device-id": dh.device.Id,
2287 "onu-device": onu,
2288 "error": err})
2289 }
2290 // Clear flowids for gem cache.
2291 for _, gem := range onu.GemPorts {
2292 dh.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gem)
2293 }
2294 onuGemData = append(onuGemData[:i], onuGemData[i+1:]...)
2295 err := onuMgr.AddOnuGemInfo(ctx, intfID, onuGemData)
2296 if err != nil {
2297 logger.Warnw(ctx, "persistence-update-onu-gem-info-failed", log.Fields{
2298 "intf-id": intfID,
2299 "onu-device": onu,
2300 "onu-gem": onuGemData,
2301 "error": err})
2302 //Not returning error on cleanup.
2303 }
2304 logger.Debugw(ctx, "removed-onu-gem-info", log.Fields{"intf": intfID, "onu-device": onu, "onugem": onuGemData})
2305 dh.resourceMgr.FreeonuID(ctx, intfID, []uint32{onu.OnuID})
2306 break
2307 }
2308 }
2309 }
2310 }
2311 dh.onus.Delete(onuKey)
2312 dh.discOnus.Delete(onuDevice.(*OnuDevice).serialNumber)
2313 return nil
2314}
2315
2316func getInPortFromFlow(flow *of.OfpFlowStats) uint32 {
2317 for _, field := range flows.GetOfbFields(flow) {
2318 if field.Type == flows.IN_PORT {
2319 return field.GetPort()
2320 }
2321 }
2322 return InvalidPort
2323}
2324
2325func getOutPortFromFlow(flow *of.OfpFlowStats) uint32 {
2326 for _, action := range flows.GetActions(flow) {
2327 if action.Type == flows.OUTPUT {
2328 if out := action.GetOutput(); out != nil {
2329 return out.GetPort()
2330 }
2331 }
2332 }
2333 return InvalidPort
2334}
2335
2336func (dh *DeviceHandler) incrementActiveFlowRemoveCount(ctx context.Context, flow *of.OfpFlowStats) {
2337 inPort, outPort := getPorts(flow)
2338 logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
2339 if inPort != InvalidPort && outPort != InvalidPort {
2340 _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
2341 key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
2342 logger.Debugw(ctx, "increment-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
2343
2344 dh.lockDevice.Lock()
2345 defer dh.lockDevice.Unlock()
2346 flowRemoveData, ok := dh.pendingFlowRemoveDataPerSubscriber[key]
2347 if !ok {
2348 flowRemoveData = pendingFlowRemoveData{
2349 pendingFlowRemoveCount: 0,
2350 allFlowsRemoved: make(chan struct{}),
2351 }
2352 }
2353 flowRemoveData.pendingFlowRemoveCount++
2354 dh.pendingFlowRemoveDataPerSubscriber[key] = flowRemoveData
2355
2356 logger.Debugw(ctx, "current-flow-remove-count窶妬ncrement",
2357 log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
2358 "currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
2359 }
2360}
2361
2362func (dh *DeviceHandler) decrementActiveFlowRemoveCount(ctx context.Context, flow *of.OfpFlowStats) {
2363 inPort, outPort := getPorts(flow)
2364 logger.Debugw(ctx, "decrement-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
2365 if inPort != InvalidPort && outPort != InvalidPort {
2366 _, intfID, onuID, uniID := ExtractAccessFromFlow(uint32(inPort), uint32(outPort))
2367 key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
2368 logger.Debugw(ctx, "decrement-flow-remove-count-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
2369
2370 dh.lockDevice.Lock()
2371 defer dh.lockDevice.Unlock()
2372 if val, ok := dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
2373 logger.Fatalf(ctx, "flow-remove-key-not-found", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
2374 } else {
2375 if val.pendingFlowRemoveCount > 0 {
2376 val.pendingFlowRemoveCount--
2377 }
2378 logger.Debugw(ctx, "current-flow-remove-count-after-decrement",
2379 log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID,
2380 "currCnt": dh.pendingFlowRemoveDataPerSubscriber[key].pendingFlowRemoveCount})
2381 // If all flow removes have finished, then close the channel to signal the receiver
2382 // to go ahead with flow adds.
2383 if val.pendingFlowRemoveCount == 0 {
2384 close(val.allFlowsRemoved)
2385 delete(dh.pendingFlowRemoveDataPerSubscriber, key)
2386 return
2387 }
2388 dh.pendingFlowRemoveDataPerSubscriber[key] = val
2389 }
2390 }
2391}
2392
2393func (dh *DeviceHandler) waitForFlowRemoveToFinish(ctx context.Context, flow *of.OfpFlowStats) {
2394 var flowRemoveData pendingFlowRemoveData
2395 var ok bool
2396 inPort, outPort := getPorts(flow)
2397 logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
2398 if inPort != InvalidPort && outPort != InvalidPort {
2399 _, intfID, onuID, uniID := ExtractAccessFromFlow(inPort, outPort)
2400 key := pendingFlowRemoveDataKey{intfID: intfID, onuID: onuID, uniID: uniID}
2401 logger.Debugw(ctx, "wait-for-flow-remove-to-finish-for-subscriber", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
2402
2403 dh.lockDevice.RLock()
2404 if flowRemoveData, ok = dh.pendingFlowRemoveDataPerSubscriber[key]; !ok {
2405 logger.Debugw(ctx, "no-pending-flow-to-remove", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
2406 dh.lockDevice.RUnlock()
2407 return
2408 }
2409 dh.lockDevice.RUnlock()
2410
2411 // Wait for all flow removes to finish first
2412 <-flowRemoveData.allFlowsRemoved
2413
2414 logger.Debugw(ctx, "all-flows-cleared--handling-flow-add-now", log.Fields{"intf-id": intfID, "onu-id": onuID, "uni-id": uniID})
2415 }
2416}
2417
2418func getPorts(flow *of.OfpFlowStats) (uint32, uint32) {
2419 inPort := getInPortFromFlow(flow)
2420 outPort := getOutPortFromFlow(flow)
2421
2422 if inPort == InvalidPort || outPort == InvalidPort {
2423 return inPort, outPort
2424 }
2425
2426 if isControllerFlow := IsControllerBoundFlow(outPort); isControllerFlow {
2427 /* Get UNI port/ IN Port from tunnel ID field for upstream controller bound flows */
2428 if portType := IntfIDToPortTypeName(inPort); portType == voltha.Port_PON_OLT {
2429 if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
2430 return uniPort, outPort
2431 }
2432 }
2433 } else {
2434 // Downstream flow from NNI to PON port , Use tunnel ID as new OUT port / UNI port
2435 if portType := IntfIDToPortTypeName(outPort); portType == voltha.Port_PON_OLT {
2436 if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
2437 return inPort, uniPort
2438 }
2439 // Upstream flow from PON to NNI port , Use tunnel ID as new IN port / UNI port
2440 } else if portType := IntfIDToPortTypeName(inPort); portType == voltha.Port_PON_OLT {
2441 if uniPort := flows.GetChildPortFromTunnelId(flow); uniPort != 0 {
2442 return uniPort, outPort
2443 }
2444 }
2445 }
2446
2447 return InvalidPort, InvalidPort
2448}
2449
2450func extractOmciTransactionID(omciPkt []byte) uint16 {
2451 if len(omciPkt) > 3 {
2452 d := omciPkt[0:2]
2453 transid := binary.BigEndian.Uint16(d)
2454 return transid
2455 }
2456 return 0
2457}
2458
2459// StoreOnuDevice stores the onu parameters to the local cache.
2460func (dh *DeviceHandler) StoreOnuDevice(onuDevice *OnuDevice) {
2461 onuKey := dh.formOnuKey(onuDevice.intfID, onuDevice.onuID)
2462 dh.onus.Store(onuKey, onuDevice)
2463}
2464
2465func (dh *DeviceHandler) getExtValue(ctx context.Context, device *voltha.Device, value voltha.ValueType_Type) (*voltha.ReturnValues, error) {
2466 var err error
2467 var sn *oop.SerialNumber
2468 var ID uint32
2469 resp := new(voltha.ReturnValues)
2470 valueparam := new(oop.ValueParam)
2471 ctx = log.WithSpanFromContext(context.Background(), ctx)
2472 logger.Infow(ctx, "getExtValue", log.Fields{"onu-id": device.Id, "pon-intf": device.ParentPortNo})
2473 if sn, err = dh.deStringifySerialNumber(device.SerialNumber); err != nil {
2474 return nil, err
2475 }
2476 ID = device.ProxyAddress.GetOnuId()
2477 Onu := oop.Onu{IntfId: device.ParentPortNo, OnuId: ID, SerialNumber: sn}
2478 valueparam.Onu = &Onu
2479 valueparam.Value = value
2480
2481 resp.Unsupported = uint32(value)
2482 _ = ctx
2483
2484 /*
2485 resp, err = dh.Client.GetValue(ctx, valueparam)
2486 if err != nil {
2487 logger.Errorw("error-while-getValue", log.Fields{"DeviceID": dh.device, "onu-id": onuid, "error": err})
2488 return nil, err
2489 }
2490 */
2491
2492 logger.Infow(ctx, "get-ext-value", log.Fields{"resp": resp, "device-id": dh.device, "onu-id": device.Id, "pon-intf": device.ParentPortNo})
2493 return resp, nil
2494}
2495
2496// func (dh *DeviceHandler) L2oamCmdRequest2(ctx context.Context, device *voltha.Device, request *voltha.SimulateAlarmRequest) error {
2497// logger.Infow(ctx, "L2oamRequest", log.Fields{"device-id": device.Id})
2498
2499// switch request.Drift {
2500// case 0: // add-flow
2501// L2oamAfterKeepAlive(ctx, dh)
2502// L2oamAddFlow(ctx, dh)
2503
2504// case 1: // add-flow-dev
2505// tomiObjContext := L2oamAddFlowToDeviceDS(ctx, dh)
2506// if tomiObjContext != nil {
2507// L2oamAddFlowToDeviceUS(ctx, dh, tomiObjContext)
2508// }
2509// }
2510
2511// return nil
2512// }
2513
2514// L2oamCmdRequest executes some commands for L2OAM
2515func (dh *DeviceHandler) L2oamCmdRequest(ctx context.Context, device *voltha.Device, request *voltha.SimulateAlarmRequest) error {
2516 logger.Infow(ctx, "L2oamCmdRequest()", log.Fields{"device-id": device.Id})
2517
2518 cmd := parseL2oamCmd(request)
2519 logger.Info(ctx, "parseL2oamCmd() request=%v, cmd=%v", request, cmd)
2520
2521 switch cmd.Type {
2522 case "add-flow":
2523 //L2oamAfterKeepAlive(ctx, dh)
2524 L2oamAddFlow(ctx, dh, cmd)
2525
2526 case "add-flow-dev":
2527 err := L2oamAddFlowToDeviceAll(ctx, dh, cmd)
2528 if err == nil {
2529 onu := FindL2oamDeviceByDeviceID(cmd.OnuDeviceID)
2530 if onu == nil {
2531 logger.Debug(ctx, fmt.Sprintf("L2oamCmdRequest() FindL2oamDevice() onu not found. deviceId=%s", cmd.OnuDeviceID))
2532 } else {
2533 // start ONU mount sequence
2534 onu.startMountSequence(context.Background(), l2oam.OnuPkgType, cmd)
2535 }
2536 }
2537 default:
2538 logger.Error(ctx, "L2oamCmdRequest() cmd.Type error. request=%v, cmd=%v", request, cmd)
2539 }
2540
2541 return nil
2542}
2543
2544// L2oamCmd contains command arguments for L2OAM command
2545type L2oamCmd struct {
2546 Type string
2547 Cir []byte
2548 Pir []byte
2549 Tpid []byte
2550 Vid []byte
2551 Itpid []byte
2552 Ivid []byte
2553 OnuDeviceID string
2554}
2555
2556func parseL2oamCmd(request *voltha.SimulateAlarmRequest) *L2oamCmd {
2557 l2oamCmd := &L2oamCmd{
2558 Type: "unknown",
2559 Cir: []byte{0x00, 0x00, 0x03, 0xe8},
2560 Pir: []byte{0x00, 0x98, 0x96, 0x80},
2561 Tpid: []byte{0x88, 0xa8},
2562 Vid: []byte{0x00, 0x64},
2563 Itpid: []byte{0x81, 0x00},
2564 Ivid: []byte{0x00, 0x0a},
2565 OnuDeviceID: "",
2566 }
2567
2568 if request.Indicator == "" {
2569 return l2oamCmd
2570 }
2571 cmdStr := strings.Replace(request.Indicator, "\"", "", -1)
2572 if request.Drift == 0 {
2573 l2oamCmd.Type = "add-flow"
2574 } else {
2575 l2oamCmd.Type = "add-flow-dev"
2576 }
2577
2578 cmds := strings.Split(cmdStr, ",")
2579 for _, cmd := range cmds {
2580 kv := strings.Split(cmd, "=")
2581 key := kv[0]
2582 value := kv[1]
2583 switch key {
2584 case "cir":
2585 bytes, _ := hex.DecodeString(value)
2586 l2oamCmd.Cir = bytes
2587 case "pir":
2588 bytes, _ := hex.DecodeString(value)
2589 l2oamCmd.Pir = bytes
2590 case "tpid":
2591 bytes, _ := hex.DecodeString(value)
2592 l2oamCmd.Tpid = bytes
2593 case "vid":
2594 bytes, _ := hex.DecodeString(value)
2595 l2oamCmd.Vid = bytes
2596 case "itpid":
2597 bytes, _ := hex.DecodeString(value)
2598 l2oamCmd.Itpid = bytes
2599 case "ivid":
2600 bytes, _ := hex.DecodeString(value)
2601 l2oamCmd.Ivid = bytes
2602 case "onudeviceid":
2603 l2oamCmd.OnuDeviceID = value
2604 default:
2605 }
2606 }
2607 return l2oamCmd
2608}