blob: 376df16670070c14bb3230072ba43952dc4d24c3 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package graph
import (
"errors"
"fmt"
"github.com/gyuho/goraph"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-protos/go/voltha"
"strconv"
"strings"
"sync"
)
func init() {
log.AddPackage(log.JSON, log.WarnLevel, nil)
}
type RouteHop struct {
DeviceID string
Ingress uint32
Egress uint32
}
type OFPortLink struct {
Ingress uint32
Egress uint32
}
type ofPortLinkToPath struct {
link OFPortLink
path []RouteHop
}
type GetDeviceFunc func(id string) (*voltha.Device, error)
type DeviceGraph struct {
logicalDeviceId string
GGraph goraph.Graph
getDeviceFromModel GetDeviceFunc
logicalPorts []*voltha.LogicalPort
rootPortsString map[string]uint32
nonRootPortsString map[string]uint32
RootPorts map[uint32]uint32
rootPortsLock sync.RWMutex
Routes map[OFPortLink][]RouteHop
graphBuildLock sync.RWMutex
boundaryPorts map[string]uint32
boundaryPortsLock sync.RWMutex
cachedDevices map[string]*voltha.Device
cachedDevicesLock sync.RWMutex
devicesAdded map[string]string
portsAdded map[string]string
}
func NewDeviceGraph(logicalDeviceId string, getDevice GetDeviceFunc) *DeviceGraph {
var dg DeviceGraph
dg.logicalDeviceId = logicalDeviceId
dg.GGraph = goraph.NewGraph()
dg.getDeviceFromModel = getDevice
dg.graphBuildLock = sync.RWMutex{}
dg.cachedDevicesLock = sync.RWMutex{}
dg.rootPortsLock = sync.RWMutex{}
dg.devicesAdded = make(map[string]string)
dg.portsAdded = make(map[string]string)
dg.rootPortsString = make(map[string]uint32)
dg.nonRootPortsString = make(map[string]uint32)
dg.RootPorts = make(map[uint32]uint32)
dg.boundaryPorts = make(map[string]uint32)
dg.Routes = make(map[OFPortLink][]RouteHop)
dg.cachedDevices = make(map[string]*voltha.Device)
log.Debug("new device graph created ...")
return &dg
}
//IsRootPort returns true if the port is a root port on a logical device
func (dg *DeviceGraph) IsRootPort(port uint32) bool {
dg.rootPortsLock.RLock()
defer dg.rootPortsLock.RUnlock()
_, exist := dg.RootPorts[port]
return exist
}
//GetDeviceNodeIds retrieves all the nodes in the device graph
func (dg *DeviceGraph) GetDeviceNodeIds() map[string]string {
dg.graphBuildLock.RLock()
defer dg.graphBuildLock.RUnlock()
nodeIds := make(map[string]string)
nodesMap := dg.GGraph.GetNodes()
for id, node := range nodesMap {
if len(strings.Split(node.String(), ":")) != 2 { // not port node
nodeIds[id.String()] = id.String()
}
}
return nodeIds
}
//ComputeRoutes creates a device graph from the logical ports and then calculates all the routes
//between the logical ports. This will clear up the graph and routes if there were any.
func (dg *DeviceGraph) ComputeRoutes(lps []*voltha.LogicalPort) {
if dg == nil {
return
}
dg.graphBuildLock.Lock()
defer dg.graphBuildLock.Unlock()
// Clear the graph
dg.reset()
dg.logicalPorts = lps
// Set the root, non-root ports and boundary ports
for _, lp := range lps {
portId := concatDeviceIdPortId(lp.DeviceId, lp.DevicePortNo)
if lp.RootPort {
dg.rootPortsString[portId] = lp.OfpPort.PortNo
dg.RootPorts[lp.OfpPort.PortNo] = lp.OfpPort.PortNo
} else {
dg.nonRootPortsString[portId] = lp.OfpPort.PortNo
}
dg.boundaryPorts[portId] = lp.OfpPort.PortNo
}
// Build the graph
var device *voltha.Device
for _, logicalPort := range dg.logicalPorts {
device, _ = dg.getDevice(logicalPort.DeviceId)
dg.GGraph = dg.addDevice(device, dg.GGraph, &dg.devicesAdded, &dg.portsAdded, dg.boundaryPorts)
}
dg.Routes = dg.buildRoutes()
}
// AddPort adds a port to the graph. If the graph is empty it will just invoke ComputeRoutes function
func (dg *DeviceGraph) AddPort(lp *voltha.LogicalPort) {
// If the graph does not exist invoke ComputeRoutes.
if len(dg.boundaryPorts) == 0 {
dg.ComputeRoutes([]*voltha.LogicalPort{lp})
return
}
dg.graphBuildLock.Lock()
defer dg.graphBuildLock.Unlock()
portId := concatDeviceIdPortId(lp.DeviceId, lp.DevicePortNo)
// If the port is already part of the boundary ports, do nothing
if dg.portExist(portId) {
fmt.Println("port exists")
return
}
// Add the device where this port is located to the device graph. If the device is already added then
// only the missing port will be added
device, _ := dg.getDevice(lp.DeviceId)
dg.GGraph = dg.addDevice(device, dg.GGraph, &dg.devicesAdded, &dg.portsAdded, dg.boundaryPorts)
if lp.RootPort {
// Compute the route from this root port to all non-root ports
dg.rootPortsString[portId] = lp.OfpPort.PortNo
dg.RootPorts[lp.OfpPort.PortNo] = lp.OfpPort.PortNo
dg.Routes = dg.buildPathsToAllNonRootPorts(lp)
} else {
// Compute the route from this port to all root ports
dg.nonRootPortsString[portId] = lp.OfpPort.PortNo
dg.Routes = dg.buildPathsToAllRootPorts(lp)
}
dg.Print()
}
func (dg *DeviceGraph) Print() error {
if level, err := log.GetPackageLogLevel(); err == nil && level == log.DebugLevel {
output := ""
routeNumber := 1
for k, v := range dg.Routes {
key := fmt.Sprintf("LP:%d->LP:%d", k.Ingress, k.Egress)
val := ""
for _, i := range v {
val += fmt.Sprintf("{%d->%s->%d},", i.Ingress, i.DeviceID, i.Egress)
}
val = val[:len(val)-1]
output += fmt.Sprintf("%d:{%s=>%s} ", routeNumber, key, fmt.Sprintf("[%s]", val))
routeNumber += 1
}
log.Debugw("graph_routes", log.Fields{"lDeviceId": dg.logicalDeviceId, "Routes": output})
}
return nil
}
//getDevice returns the device either from the local cache (default) or from the model.
//TODO: Set a cache timeout such that we do not use invalid data. The full device lifecycle should also
//be taken in consideration
func (dg *DeviceGraph) getDevice(id string) (*voltha.Device, error) {
dg.cachedDevicesLock.RLock()
if d, exist := dg.cachedDevices[id]; exist {
dg.cachedDevicesLock.RUnlock()
//log.Debugw("getDevice - returned from cache", log.Fields{"deviceId": id})
return d, nil
}
dg.cachedDevicesLock.RUnlock()
// Not cached
if d, err := dg.getDeviceFromModel(id); err != nil {
log.Errorw("device-not-found", log.Fields{"deviceId": id, "error": err})
return nil, err
} else { // cache it
dg.cachedDevicesLock.Lock()
dg.cachedDevices[id] = d
dg.cachedDevicesLock.Unlock()
//log.Debugw("getDevice - returned from model", log.Fields{"deviceId": id})
return d, nil
}
}
// addDevice adds a device to a device graph and setup edges that represent the device connections to its peers
func (dg *DeviceGraph) addDevice(device *voltha.Device, g goraph.Graph, devicesAdded *map[string]string, portsAdded *map[string]string,
boundaryPorts map[string]uint32) goraph.Graph {
if device == nil {
return g
}
if _, exist := (*devicesAdded)[device.Id]; !exist {
g.AddNode(goraph.NewNode(device.Id))
(*devicesAdded)[device.Id] = device.Id
}
var portId string
var peerPortId string
for _, port := range device.Ports {
portId = concatDeviceIdPortId(device.Id, port.PortNo)
if _, exist := (*portsAdded)[portId]; !exist {
(*portsAdded)[portId] = portId
g.AddNode(goraph.NewNode(portId))
g.AddEdge(goraph.StringID(device.Id), goraph.StringID(portId), 1)
g.AddEdge(goraph.StringID(portId), goraph.StringID(device.Id), 1)
}
for _, peer := range port.Peers {
if _, exist := (*devicesAdded)[peer.DeviceId]; !exist {
d, _ := dg.getDevice(peer.DeviceId)
g = dg.addDevice(d, g, devicesAdded, portsAdded, boundaryPorts)
} else {
peerPortId = concatDeviceIdPortId(peer.DeviceId, peer.PortNo)
g.AddEdge(goraph.StringID(portId), goraph.StringID(peerPortId), 1)
g.AddEdge(goraph.StringID(peerPortId), goraph.StringID(portId), 1)
}
}
}
return g
}
//portExist returns true if the port ID is already part of the boundary ports map.
func (dg *DeviceGraph) portExist(id string) bool {
dg.boundaryPortsLock.RLock()
defer dg.boundaryPortsLock.RUnlock()
_, exist := dg.boundaryPorts[id]
return exist
}
// buildPathsToAllRootPorts builds all the paths from the non-root logical port to all root ports
// on the logical device
func (dg *DeviceGraph) buildPathsToAllRootPorts(lp *voltha.LogicalPort) map[OFPortLink][]RouteHop {
paths := dg.Routes
source := concatDeviceIdPortId(lp.DeviceId, lp.DevicePortNo)
sourcePort := lp.OfpPort.PortNo
ch := make(chan *ofPortLinkToPath)
numBuildRequest := 0
for target, targetPort := range dg.rootPortsString {
go dg.buildRoute(source, target, sourcePort, targetPort, ch)
numBuildRequest += 1
}
responseReceived := 0
forloop:
for {
if responseReceived == numBuildRequest {
break
}
select {
case res, ok := <-ch:
if !ok {
log.Debug("channel closed")
break forloop
}
if res != nil && len(res.path) > 0 {
paths[res.link] = res.path
paths[OFPortLink{Ingress: res.link.Egress, Egress: res.link.Ingress}] = getReverseRoute(res.path)
}
}
responseReceived += 1
}
return paths
}
// buildPathsToAllNonRootPorts builds all the paths from the root logical port to all non-root ports
// on the logical device
func (dg *DeviceGraph) buildPathsToAllNonRootPorts(lp *voltha.LogicalPort) map[OFPortLink][]RouteHop {
paths := dg.Routes
source := concatDeviceIdPortId(lp.DeviceId, lp.DevicePortNo)
sourcePort := lp.OfpPort.PortNo
ch := make(chan *ofPortLinkToPath)
numBuildRequest := 0
for target, targetPort := range dg.nonRootPortsString {
go dg.buildRoute(source, target, sourcePort, targetPort, ch)
numBuildRequest += 1
}
responseReceived := 0
forloop:
for {
if responseReceived == numBuildRequest {
break
}
select {
case res, ok := <-ch:
if !ok {
log.Debug("channel closed")
break forloop
}
if res != nil && len(res.path) > 0 {
paths[res.link] = res.path
paths[OFPortLink{Ingress: res.link.Egress, Egress: res.link.Ingress}] = getReverseRoute(res.path)
}
}
responseReceived += 1
}
return paths
}
//buildRoute builds a route between a source and a target logical port
func (dg *DeviceGraph) buildRoute(sourceId, targetId string, sourcePort, targetPort uint32, ch chan *ofPortLinkToPath) {
var pathIds []goraph.ID
path := make([]RouteHop, 0)
var err error
var hop RouteHop
var result *ofPortLinkToPath
if sourceId == targetId {
ch <- result
return
}
//Ignore Root - Root Routes
if dg.IsRootPort(sourcePort) && dg.IsRootPort(targetPort) {
ch <- result
return
}
//Ignore non-Root - non-Root Routes
if !dg.IsRootPort(sourcePort) && !dg.IsRootPort(targetPort) {
ch <- result
return
}
if pathIds, _, err = goraph.Dijkstra(dg.GGraph, goraph.StringID(sourceId), goraph.StringID(targetId)); err != nil {
log.Errorw("no-path", log.Fields{"sourceId": sourceId, "targetId": targetId, "error": err})
ch <- result
return
}
if len(pathIds)%3 != 0 {
ch <- result
return
}
var deviceId string
var ingressPort uint32
var egressPort uint32
for i := 0; i < len(pathIds); i = i + 3 {
if deviceId, ingressPort, err = splitIntoDeviceIdPortId(pathIds[i].String()); err != nil {
log.Errorw("id-error", log.Fields{"sourceId": sourceId, "targetId": targetId, "error": err})
break
}
if _, egressPort, err = splitIntoDeviceIdPortId(pathIds[i+2].String()); err != nil {
log.Errorw("id-error", log.Fields{"sourceId": sourceId, "targetId": targetId, "error": err})
break
}
hop = RouteHop{Ingress: ingressPort, DeviceID: deviceId, Egress: egressPort}
path = append(path, hop)
}
result = &ofPortLinkToPath{link: OFPortLink{Ingress: sourcePort, Egress: targetPort}, path: path}
ch <- result
}
//buildRoutes build all routes between all the ports on the logical device
func (dg *DeviceGraph) buildRoutes() map[OFPortLink][]RouteHop {
paths := make(map[OFPortLink][]RouteHop)
ch := make(chan *ofPortLinkToPath)
numBuildRequest := 0
for source, sourcePort := range dg.boundaryPorts {
for target, targetPort := range dg.boundaryPorts {
go dg.buildRoute(source, target, sourcePort, targetPort, ch)
numBuildRequest += 1
}
}
responseReceived := 0
forloop:
for {
if responseReceived == numBuildRequest {
break
}
select {
case res, ok := <-ch:
if !ok {
log.Debug("channel closed")
break forloop
}
if res != nil && len(res.path) > 0 {
paths[res.link] = res.path
}
}
responseReceived += 1
}
return paths
}
// reset cleans up the device graph
func (dg *DeviceGraph) reset() {
dg.devicesAdded = make(map[string]string)
dg.portsAdded = make(map[string]string)
dg.rootPortsString = make(map[string]uint32)
dg.nonRootPortsString = make(map[string]uint32)
dg.RootPorts = make(map[uint32]uint32)
dg.boundaryPorts = make(map[string]uint32)
dg.Routes = make(map[OFPortLink][]RouteHop)
dg.cachedDevices = make(map[string]*voltha.Device)
}
//concatDeviceIdPortId formats a portid using the device id and the port number
func concatDeviceIdPortId(deviceId string, portNo uint32) string {
return fmt.Sprintf("%s:%d", deviceId, portNo)
}
// splitIntoDeviceIdPortId extracts the device id and port number from the portId
func splitIntoDeviceIdPortId(id string) (string, uint32, error) {
result := strings.Split(id, ":")
if len(result) != 2 {
return "", 0, errors.New(fmt.Sprintf("invalid-id-%s", id))
}
if temp, err := strconv.ParseInt(result[1], 10, 32); err != nil {
return "", 0, errors.New(fmt.Sprintf("invalid-id-%s-%s", id, err.Error()))
} else {
return result[0], uint32(temp), nil
}
}
//getReverseRoute returns the reverse of the route in param
func getReverseRoute(route []RouteHop) []RouteHop {
reverse := make([]RouteHop, len(route))
for i, j := 0, len(route)-1; i < j; i, j = i+1, j-1 {
reverse[i], reverse[j] = route[j], route[i]
}
return reverse
}