blob: 46fe2f244ada17c6a0d13b955fdaad3cbc4ec679 [file] [log] [blame]
/*
Copyright 2017 the original author or authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openflow
import (
"context"
"encoding/json"
"github.com/opencord/voltha-protos/go/openflow_13"
"github.com/skydive-project/goloxi"
"log"
"net"
"unsafe"
"github.com/opencord/voltha-protos/go/common"
pb "github.com/opencord/voltha-protos/go/voltha"
ofp "github.com/skydive-project/goloxi/of13"
)
func handleStatsRequest(request ofp.IHeader, statType uint16, deviceId string, client *Client) error {
message, _ := json.Marshal(request)
log.Printf("handleStatsRequest called with %s\n ", message)
var id = common.ID{Id: deviceId}
switch statType {
case 0:
statsReq := request.(*ofp.DescStatsRequest)
response, err := handleDescStatsRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(response)
case 1:
statsReq := request.(*ofp.FlowStatsRequest)
response, _ := handleFlowStatsRequest(statsReq, id)
err := client.SendMessage(response)
if err != nil {
return err
}
case 2:
statsReq := request.(*ofp.AggregateStatsRequest)
aggregateStatsReply, err := handleAggregateStatsRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(aggregateStatsReply)
case 3:
statsReq := request.(*ofp.TableStatsRequest)
tableStatsReply, e := handleTableStatsRequest(statsReq, id)
if e != nil {
return e
}
client.SendMessage(tableStatsReply)
case 4:
statsReq := request.(*ofp.PortStatsRequest)
response, err := handlePortStatsRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(response)
case 5:
statsReq := request.(*ofp.QueueStatsRequest)
response, err := handleQueueStatsRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(response)
case 6:
statsReq := request.(*ofp.GroupStatsRequest)
response, err := handleGroupStatsRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(response)
case 7:
statsReq := request.(*ofp.GroupDescStatsRequest)
response, err := handleGroupStatsDescRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(response)
case 8:
statsReq := request.(*ofp.GroupFeaturesStatsRequest)
response, err := handleGroupFeatureStatsRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(response)
case 9:
statsReq := request.(*ofp.MeterStatsRequest)
response, err := handleMeterStatsRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(response)
case 10:
statsReq := request.(*ofp.MeterConfigStatsRequest)
response, err := handleMeterConfigStatsRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(response)
case 11:
statsReq := request.(*ofp.MeterFeaturesStatsRequest)
response, err := handleMeterFeatureStatsRequest(statsReq)
if err != nil {
return err
}
client.SendMessage(response)
case 12:
statsReq := request.(*ofp.TableFeaturesStatsRequest)
response, err := handleTableFeaturesStatsRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(response)
case 13:
statsReq := request.(*ofp.PortDescStatsRequest)
response, err := handlePortDescStatsRequest(statsReq, deviceId)
if err != nil {
return err
}
client.SendMessage(response)
case 65535:
statsReq := request.(*ofp.ExperimenterStatsRequest)
response, err := handleExperimenterStatsRequest(statsReq, id)
if err != nil {
return err
}
client.SendMessage(response)
}
return nil
}
func handleDescStatsRequest(request *ofp.DescStatsRequest, id common.ID) (*ofp.DescStatsReply, error) {
response := ofp.NewDescStatsReply()
response.SetXid(request.GetXid())
response.SetVersion(request.GetVersion())
client := *getGrpcClient()
resp, err := client.GetLogicalDevice(context.Background(), &id)
if err != nil {
return nil, err
}
desc := resp.GetDesc()
response.SetMfrDesc(PadString(desc.GetMfrDesc(), 256))
response.SetHwDesc(PadString(desc.GetHwDesc(), 256))
response.SetSwDesc(PadString(desc.GetSwDesc(), 256))
response.SetSerialNum(PadString(desc.GetSerialNum(), 32))
response.SetDpDesc(PadString(desc.GetDpDesc(), 256))
//jsonRes,_ := json.Marshal(response)
//log.Printf("handleDescStatsRequest response : %s",jsonRes)
return response, nil
}
func handleFlowStatsRequest(request *ofp.FlowStatsRequest, id common.ID) (*ofp.FlowStatsReply, error) {
log.Println("****************************************\n***********************************")
response := ofp.NewFlowStatsReply()
response.SetXid(request.GetXid())
response.SetVersion(request.GetVersion())
client := *getGrpcClient()
resp, err := client.ListLogicalDeviceFlows(context.Background(), &id)
if err != nil {
log.Println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
log.Printf("err in handleFlowStatsRequest calling ListLogicalDeviceFlows %v", err)
return nil, err
}
log.Println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
js, _ := json.Marshal(resp.GetItems())
log.Printf("||||||||||||||||||||||||||||||||||||HandFlowStat %s|||||||||||||||||||||||||||||||||||||||||", js)
var flow []*ofp.FlowStatsEntry
items := resp.GetItems()
for i := 0; i < len(items); i++ {
item := items[1]
var entry ofp.FlowStatsEntry
entry.SetTableId(uint8(item.GetId()))
entry.SetDurationSec(item.GetDurationSec())
entry.SetDurationNsec(item.GetDurationNsec())
entry.SetPriority(uint16(item.GetPriority()))
entry.SetIdleTimeout(uint16(item.GetIdleTimeout()))
entry.SetHardTimeout(uint16(item.GetHardTimeout()))
entry.SetFlags(ofp.FlowModFlags(item.GetFlags()))
entry.SetCookie(item.GetCookie())
entry.SetPacketCount(item.GetPacketCount())
entry.SetByteCount(item.GetByteCount())
var match ofp.Match
pbMatch := item.GetMatch()
var fields []goloxi.IOxm
match.SetType(uint16(pbMatch.GetType()))
oxFields := pbMatch.GetOxmFields()
for i := 0; i < len(oxFields); i++ {
js, _ := json.Marshal(oxFields[i])
log.Printf("oxfields %s", js)
oxmField := oxFields[i]
field := oxmField.GetField()
ofbField := field.(*openflow_13.OfpOxmField_OfbField).OfbField
switch ofbField.Type {
case pb.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT:
ofpInPort := ofp.NewOxmInPort()
val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_Port)
ofpInPort.Value = ofp.Port(val.Port)
fields = append(fields, ofpInPort)
case pb.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT:
ofpInPhyPort := ofp.NewOxmInPhyPort()
val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_PhysicalPort)
ofpInPhyPort.Value = ofp.Port(val.PhysicalPort)
fields = append(fields, ofpInPhyPort)
case pb.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
ofpIpProto := ofp.NewOxmIpProto()
val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_IpProto)
ofpIpProto.Value = ofp.IpPrototype(val.IpProto)
fields = append(fields, ofpIpProto)
case pb.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
ofpUdpSrc := ofp.NewOxmUdpSrc()
val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpSrc)
ofpUdpSrc.Value = uint16(val.UdpSrc)
fields = append(fields, ofpUdpSrc)
case pb.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
ofpUdpDst := ofp.NewOxmUdpSrc()
val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpDst)
ofpUdpDst.Value = uint16(val.UdpDst)
fields = append(fields, ofpUdpDst)
case pb.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
ofpVlanVid := ofp.NewOxmVlanVid()
val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_VlanVid)
ofpVlanVid.Value = uint16(val.VlanVid)
fields = append(fields, ofpVlanVid)
default:
log.Printf("handleFlowStatsRequest Unhandled OxmField %v", ofbField.Type)
}
}
match.OxmList = fields
match.Length = uint16(unsafe.Sizeof(match))
entry.SetMatch(match)
var instructions []ofp.IInstruction
ofpInstructions := item.Instructions
for i := 0; i < len(ofpInstructions); i++ {
ofpInstruction := ofpInstructions[i]
instType := ofpInstruction.Type
switch instType {
case uint32(openflow_13.OfpInstructionType_OFPIT_APPLY_ACTIONS):
ofpActions := ofpInstruction.GetActions().Actions
for i := 0; i < len(ofpActions); i++ {
ofpAction := ofpActions[i]
var actions []*ofp.Action
switch ofpAction.Type {
case openflow_13.OfpActionType_OFPAT_OUTPUT:
ofpOutputAction := ofpAction.GetOutput()
outputAction := ofp.NewActionOutput()
outputAction.Port = ofp.Port(ofpOutputAction.Port)
outputAction.MaxLen = uint16(ofpOutputAction.MaxLen)
actions = append(actions, outputAction.Action)
}
}
instruction := ofp.NewInstruction(uint16(instType))
instructions = append(instructions, instruction)
}
}
entry.Instructions = instructions
entry.Length = uint16(unsafe.Sizeof(entry))
flow = append(flow, &entry)
}
response.SetEntries(flow)
return response, nil
}
func handleAggregateStatsRequest(request *ofp.AggregateStatsRequest, id common.ID) (*ofp.AggregateStatsReply, error) {
response := ofp.NewAggregateStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
response.SetFlowCount(0)
//TODO wire this to voltha core when it implements
return response, nil
}
func handleGroupStatsRequest(request *ofp.GroupStatsRequest, id common.ID) (*ofp.GroupStatsReply, error) {
response := ofp.NewGroupStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
client := *getGrpcClient()
reply, err := client.ListLogicalDeviceFlowGroups(context.Background(), &id)
if err != nil {
return nil, err
}
var groupStatsEntries []*ofp.GroupStatsEntry
items := reply.GetItems()
for i := 0; i < len(items); i++ {
item := items[i].GetStats()
var entry ofp.GroupStatsEntry
entry.SetByteCount(item.GetByteCount())
entry.SetPacketCount(item.GetPacketCount())
entry.SetDurationNsec(item.GetDurationNsec())
entry.SetDurationSec(item.GetDurationSec())
entry.SetRefCount(item.GetRefCount())
entry.SetGroupId(item.GetGroupId())
bucketStats := item.GetBucketStats()
var bucketStatsList []*ofp.BucketCounter
for j := 0; j < len(bucketStats); j++ {
bucketStat := bucketStats[i]
var bucketCounter ofp.BucketCounter
bucketCounter.SetPacketCount(bucketStat.GetPacketCount())
bucketCounter.SetByteCount(bucketStat.GetByteCount())
bucketStatsList = append(bucketStatsList, &bucketCounter)
}
entry.SetBucketStats(bucketStatsList)
groupStatsEntries = append(groupStatsEntries, &entry)
}
response.SetEntries(groupStatsEntries)
return response, nil
}
func handleGroupStatsDescRequest(request *ofp.GroupDescStatsRequest, id common.ID) (*ofp.GroupDescStatsReply, error) {
response := ofp.NewGroupDescStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
client := *getGrpcClient()
reply, err := client.ListLogicalDeviceFlowGroups(context.Background(), &id)
if err != nil {
return nil, err
}
entries := reply.GetItems()
var groupDescStatsEntries []*ofp.GroupDescStatsEntry
for i := 0; i < len(entries); i++ {
item := entries[i].GetStats()
var groupDesc ofp.GroupDescStatsEntry
groupDesc.SetGroupId(item.GetGroupId())
/*
buckets := item.g
var bucketList []*ofp.Bucket
for j:=0;j<len(buckets);j++{
}
groupDesc.SetBuckets(bucketList)
*/
groupDescStatsEntries = append(groupDescStatsEntries, &groupDesc)
}
response.SetEntries(groupDescStatsEntries)
return response, nil
}
func handleGroupFeatureStatsRequest(request *ofp.GroupFeaturesStatsRequest, id common.ID) (*ofp.GroupFeaturesStatsReply, error) {
response := ofp.NewGroupFeaturesStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
//TODO wire this to voltha core when it implements
return response, nil
}
func handleMeterStatsRequest(request *ofp.MeterStatsRequest, id common.ID) (*ofp.MeterStatsReply, error) {
response := ofp.NewMeterStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
//TODO wire this to voltha core when it implements
return response, nil
}
//statsReq := request.(*ofp.MeterConfigStatsRequest)
func handleMeterConfigStatsRequest(request *ofp.MeterConfigStatsRequest, id common.ID) (*ofp.MeterConfigStatsReply, error) {
response := ofp.NewMeterConfigStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
//TODO wire this to voltha core when it implements
return response, nil
}
//statsReq := request.(*ofp.TableFeaturesStatsRequest)
func handleTableFeaturesStatsRequest(request *ofp.TableFeaturesStatsRequest, id common.ID) (*ofp.TableFeaturesStatsReply, error) {
response := ofp.NewTableFeaturesStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
//TODO wire this to voltha core when it implements
return response, nil
}
func handleTableStatsRequest(request *ofp.TableStatsRequest, id common.ID) (*ofp.TableStatsReply, error) {
var tableStatsReply = ofp.NewTableStatsReply()
tableStatsReply.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
tableStatsReply.SetVersion(request.GetVersion())
tableStatsReply.SetXid(request.GetXid())
return tableStatsReply, nil
}
func handleQueueStatsRequest(request *ofp.QueueStatsRequest, id common.ID) (*ofp.QueueStatsReply, error) {
response := ofp.NewQueueStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
//TODO wire this to voltha core when it implements
return response, nil
}
func handlePortStatsRequest(request *ofp.PortStatsRequest, id common.ID) (*ofp.PortStatsReply, error) {
log.Println("HERE")
response := ofp.NewPortStatsReply()
response.SetXid(request.GetXid())
response.SetVersion(request.GetVersion())
//response.SetFlags(ofp.flagsrequest.GetFlags())
client := *getGrpcClient()
reply, err := client.ListLogicalDevicePorts(context.Background(), &id)
//reply,err := client.GetLogicalDevicePort(context.Background(),&id)
if err != nil {
log.Printf("error calling ListDevicePorts %v", err)
return nil, err
}
js, _ := json.Marshal(reply)
log.Printf("PORT STATS REPLY %s", js)
ports := reply.GetItems()
var entries []*ofp.PortStatsEntry
if request.GetPortNo() == 0xffffffff { //all ports
for i := 0; i < len(ports); i++ {
port := ports[i]
entry := parsePortStats(port)
entries = append(entries, &entry)
}
} else { //find right port that is requested
for i := 0; i < len(ports); i++ {
if ports[i].GetOfpPortStats().GetPortNo() == uint32(request.GetPortNo()) {
entry := parsePortStats(ports[i])
entries = append(entries, &entry)
}
}
}
response.SetEntries(entries)
js, _ = json.Marshal(response)
log.Printf("handlePortStatsResponse %s", js)
return response, nil
}
func parsePortStats(port *pb.LogicalPort) ofp.PortStatsEntry {
stats := port.OfpPortStats
var entry ofp.PortStatsEntry
entry.SetPortNo(ofp.Port(stats.GetPortNo()))
entry.SetRxPackets(stats.GetRxPackets())
entry.SetTxPackets(stats.GetTxPackets())
entry.SetRxBytes(stats.GetRxBytes())
entry.SetTxBytes(stats.GetTxBytes())
entry.SetRxDropped(stats.GetRxDropped())
entry.SetTxDropped(stats.GetTxDropped())
entry.SetRxErrors(stats.GetRxErrors())
entry.SetTxErrors(stats.GetTxErrors())
entry.SetRxFrameErr(stats.GetRxFrameErr())
entry.SetRxOverErr(stats.GetRxOverErr())
entry.SetRxCrcErr(stats.GetRxCrcErr())
entry.SetCollisions(stats.GetCollisions())
entry.SetDurationSec(stats.GetDurationSec())
entry.SetDurationNsec(stats.GetDurationNsec())
return entry
}
func handlePortDescStatsRequest(request *ofp.PortDescStatsRequest, deviceId string) (*ofp.PortDescStatsReply, error) {
response := ofp.NewPortDescStatsReply()
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
var grpcClient = *getGrpcClient()
var id = common.ID{Id: deviceId}
logicalDevice, err := grpcClient.GetLogicalDevice(context.Background(), &id)
if err != nil {
return nil, err
}
ports := logicalDevice.GetPorts()
var entries []*ofp.PortDesc
for i := 0; i < len(ports); i++ {
var entry ofp.PortDesc
port := ports[i].GetOfpPort()
entry.SetPortNo(ofp.Port(port.GetPortNo()))
intArray := port.GetHwAddr()
var octets []byte
for i := 0; i < len(intArray); i++ {
octets = append(octets, byte(intArray[i]))
}
hwAddr := net.HardwareAddr(octets)
entry.SetHwAddr(hwAddr)
entry.SetName(PadString(port.GetName(), 16))
entry.SetConfig(ofp.PortConfig(port.GetConfig()))
entry.SetState(ofp.PortState(port.GetState()))
entry.SetCurr(ofp.PortFeatures(port.GetCurr()))
entry.SetAdvertised(ofp.PortFeatures(port.GetAdvertised()))
entry.SetSupported(ofp.PortFeatures(port.GetSupported()))
entry.SetPeer(ofp.PortFeatures(port.GetPeer()))
entry.SetCurrSpeed(port.GetCurrSpeed())
entry.SetMaxSpeed(port.GetMaxSpeed())
entries = append(entries, &entry)
}
response.SetEntries(entries)
//TODO call voltha and get port descriptions etc
return response, nil
}
func handleMeterFeatureStatsRequest(request *ofp.MeterFeaturesStatsRequest) (*ofp.MeterFeaturesStatsReply, error) {
response := ofp.NewMeterFeaturesStatsReply()
response.SetXid(request.GetXid())
response.SetVersion(request.GetVersion())
return response, nil
}
func handleExperimenterStatsRequest(request *ofp.ExperimenterStatsRequest, id common.ID) (*ofp.ExperimenterStatsReply, error) {
response := ofp.NewExperimenterStatsReply(request.GetExperimenter())
response.SetVersion(request.GetVersion())
response.SetXid(request.GetXid())
//TODO wire this to voltha core when it implements
return response, nil
}