initial add - go fmt on grpc
Change-Id: Ib0afadd2fe5571d1456a091f94f5644458f7d3f4
diff --git a/openflow/stats.go b/openflow/stats.go
new file mode 100644
index 0000000..46fe2f2
--- /dev/null
+++ b/openflow/stats.go
@@ -0,0 +1,518 @@
+/*
+ Copyright 2017 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "context"
+ "encoding/json"
+ "github.com/opencord/voltha-protos/go/openflow_13"
+ "github.com/skydive-project/goloxi"
+ "log"
+ "net"
+ "unsafe"
+
+ "github.com/opencord/voltha-protos/go/common"
+ pb "github.com/opencord/voltha-protos/go/voltha"
+ ofp "github.com/skydive-project/goloxi/of13"
+)
+
+func handleStatsRequest(request ofp.IHeader, statType uint16, deviceId string, client *Client) error {
+ message, _ := json.Marshal(request)
+ log.Printf("handleStatsRequest called with %s\n ", message)
+ var id = common.ID{Id: deviceId}
+
+ switch statType {
+ case 0:
+ statsReq := request.(*ofp.DescStatsRequest)
+ response, err := handleDescStatsRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+
+ case 1:
+ statsReq := request.(*ofp.FlowStatsRequest)
+ response, _ := handleFlowStatsRequest(statsReq, id)
+ err := client.SendMessage(response)
+ if err != nil {
+ return err
+ }
+
+ case 2:
+ statsReq := request.(*ofp.AggregateStatsRequest)
+ aggregateStatsReply, err := handleAggregateStatsRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(aggregateStatsReply)
+ case 3:
+ statsReq := request.(*ofp.TableStatsRequest)
+ tableStatsReply, e := handleTableStatsRequest(statsReq, id)
+ if e != nil {
+ return e
+ }
+ client.SendMessage(tableStatsReply)
+ case 4:
+ statsReq := request.(*ofp.PortStatsRequest)
+ response, err := handlePortStatsRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+
+ case 5:
+ statsReq := request.(*ofp.QueueStatsRequest)
+ response, err := handleQueueStatsRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+ case 6:
+ statsReq := request.(*ofp.GroupStatsRequest)
+ response, err := handleGroupStatsRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+ case 7:
+ statsReq := request.(*ofp.GroupDescStatsRequest)
+ response, err := handleGroupStatsDescRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+ case 8:
+ statsReq := request.(*ofp.GroupFeaturesStatsRequest)
+ response, err := handleGroupFeatureStatsRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+ case 9:
+ statsReq := request.(*ofp.MeterStatsRequest)
+ response, err := handleMeterStatsRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+ case 10:
+ statsReq := request.(*ofp.MeterConfigStatsRequest)
+ response, err := handleMeterConfigStatsRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+ case 11:
+ statsReq := request.(*ofp.MeterFeaturesStatsRequest)
+ response, err := handleMeterFeatureStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+ case 12:
+ statsReq := request.(*ofp.TableFeaturesStatsRequest)
+ response, err := handleTableFeaturesStatsRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+ case 13:
+ statsReq := request.(*ofp.PortDescStatsRequest)
+ response, err := handlePortDescStatsRequest(statsReq, deviceId)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+
+ case 65535:
+ statsReq := request.(*ofp.ExperimenterStatsRequest)
+ response, err := handleExperimenterStatsRequest(statsReq, id)
+ if err != nil {
+ return err
+ }
+ client.SendMessage(response)
+ }
+ return nil
+}
+
+func handleDescStatsRequest(request *ofp.DescStatsRequest, id common.ID) (*ofp.DescStatsReply, error) {
+ response := ofp.NewDescStatsReply()
+ response.SetXid(request.GetXid())
+ response.SetVersion(request.GetVersion())
+ client := *getGrpcClient()
+ resp, err := client.GetLogicalDevice(context.Background(), &id)
+ if err != nil {
+ return nil, err
+ }
+ desc := resp.GetDesc()
+
+ response.SetMfrDesc(PadString(desc.GetMfrDesc(), 256))
+ response.SetHwDesc(PadString(desc.GetHwDesc(), 256))
+ response.SetSwDesc(PadString(desc.GetSwDesc(), 256))
+ response.SetSerialNum(PadString(desc.GetSerialNum(), 32))
+ response.SetDpDesc(PadString(desc.GetDpDesc(), 256))
+ //jsonRes,_ := json.Marshal(response)
+ //log.Printf("handleDescStatsRequest response : %s",jsonRes)
+ return response, nil
+}
+func handleFlowStatsRequest(request *ofp.FlowStatsRequest, id common.ID) (*ofp.FlowStatsReply, error) {
+ log.Println("****************************************\n***********************************")
+ response := ofp.NewFlowStatsReply()
+ response.SetXid(request.GetXid())
+ response.SetVersion(request.GetVersion())
+ client := *getGrpcClient()
+ resp, err := client.ListLogicalDeviceFlows(context.Background(), &id)
+ if err != nil {
+ log.Println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
+ log.Printf("err in handleFlowStatsRequest calling ListLogicalDeviceFlows %v", err)
+ return nil, err
+ }
+ log.Println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
+ js, _ := json.Marshal(resp.GetItems())
+ log.Printf("||||||||||||||||||||||||||||||||||||HandFlowStat %s|||||||||||||||||||||||||||||||||||||||||", js)
+ var flow []*ofp.FlowStatsEntry
+ items := resp.GetItems()
+
+ for i := 0; i < len(items); i++ {
+ item := items[1]
+ var entry ofp.FlowStatsEntry
+
+ entry.SetTableId(uint8(item.GetId()))
+ entry.SetDurationSec(item.GetDurationSec())
+ entry.SetDurationNsec(item.GetDurationNsec())
+ entry.SetPriority(uint16(item.GetPriority()))
+ entry.SetIdleTimeout(uint16(item.GetIdleTimeout()))
+ entry.SetHardTimeout(uint16(item.GetHardTimeout()))
+ entry.SetFlags(ofp.FlowModFlags(item.GetFlags()))
+ entry.SetCookie(item.GetCookie())
+ entry.SetPacketCount(item.GetPacketCount())
+ entry.SetByteCount(item.GetByteCount())
+ var match ofp.Match
+ pbMatch := item.GetMatch()
+
+ var fields []goloxi.IOxm
+ match.SetType(uint16(pbMatch.GetType()))
+ oxFields := pbMatch.GetOxmFields()
+ for i := 0; i < len(oxFields); i++ {
+ js, _ := json.Marshal(oxFields[i])
+ log.Printf("oxfields %s", js)
+ oxmField := oxFields[i]
+ field := oxmField.GetField()
+ ofbField := field.(*openflow_13.OfpOxmField_OfbField).OfbField
+ switch ofbField.Type {
+ case pb.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT:
+ ofpInPort := ofp.NewOxmInPort()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_Port)
+ ofpInPort.Value = ofp.Port(val.Port)
+ fields = append(fields, ofpInPort)
+ case pb.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT:
+ ofpInPhyPort := ofp.NewOxmInPhyPort()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_PhysicalPort)
+ ofpInPhyPort.Value = ofp.Port(val.PhysicalPort)
+ fields = append(fields, ofpInPhyPort)
+ case pb.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
+ ofpIpProto := ofp.NewOxmIpProto()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_IpProto)
+ ofpIpProto.Value = ofp.IpPrototype(val.IpProto)
+ fields = append(fields, ofpIpProto)
+ case pb.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
+ ofpUdpSrc := ofp.NewOxmUdpSrc()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpSrc)
+ ofpUdpSrc.Value = uint16(val.UdpSrc)
+ fields = append(fields, ofpUdpSrc)
+ case pb.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
+ ofpUdpDst := ofp.NewOxmUdpSrc()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpDst)
+ ofpUdpDst.Value = uint16(val.UdpDst)
+ fields = append(fields, ofpUdpDst)
+ case pb.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
+ ofpVlanVid := ofp.NewOxmVlanVid()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_VlanVid)
+ ofpVlanVid.Value = uint16(val.VlanVid)
+ fields = append(fields, ofpVlanVid)
+ default:
+ log.Printf("handleFlowStatsRequest Unhandled OxmField %v", ofbField.Type)
+ }
+
+ }
+ match.OxmList = fields
+ match.Length = uint16(unsafe.Sizeof(match))
+ entry.SetMatch(match)
+ var instructions []ofp.IInstruction
+ ofpInstructions := item.Instructions
+ for i := 0; i < len(ofpInstructions); i++ {
+ ofpInstruction := ofpInstructions[i]
+ instType := ofpInstruction.Type
+ switch instType {
+ case uint32(openflow_13.OfpInstructionType_OFPIT_APPLY_ACTIONS):
+ ofpActions := ofpInstruction.GetActions().Actions
+ for i := 0; i < len(ofpActions); i++ {
+ ofpAction := ofpActions[i]
+ var actions []*ofp.Action
+ switch ofpAction.Type {
+ case openflow_13.OfpActionType_OFPAT_OUTPUT:
+ ofpOutputAction := ofpAction.GetOutput()
+ outputAction := ofp.NewActionOutput()
+ outputAction.Port = ofp.Port(ofpOutputAction.Port)
+ outputAction.MaxLen = uint16(ofpOutputAction.MaxLen)
+ actions = append(actions, outputAction.Action)
+ }
+ }
+ instruction := ofp.NewInstruction(uint16(instType))
+ instructions = append(instructions, instruction)
+ }
+
+ }
+ entry.Instructions = instructions
+ entry.Length = uint16(unsafe.Sizeof(entry))
+ flow = append(flow, &entry)
+
+ }
+ response.SetEntries(flow)
+ return response, nil
+}
+func handleAggregateStatsRequest(request *ofp.AggregateStatsRequest, id common.ID) (*ofp.AggregateStatsReply, error) {
+ response := ofp.NewAggregateStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlowCount(0)
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+func handleGroupStatsRequest(request *ofp.GroupStatsRequest, id common.ID) (*ofp.GroupStatsReply, error) {
+ response := ofp.NewGroupStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ client := *getGrpcClient()
+ reply, err := client.ListLogicalDeviceFlowGroups(context.Background(), &id)
+ if err != nil {
+ return nil, err
+ }
+
+ var groupStatsEntries []*ofp.GroupStatsEntry
+ items := reply.GetItems()
+ for i := 0; i < len(items); i++ {
+ item := items[i].GetStats()
+ var entry ofp.GroupStatsEntry
+ entry.SetByteCount(item.GetByteCount())
+ entry.SetPacketCount(item.GetPacketCount())
+ entry.SetDurationNsec(item.GetDurationNsec())
+ entry.SetDurationSec(item.GetDurationSec())
+ entry.SetRefCount(item.GetRefCount())
+ entry.SetGroupId(item.GetGroupId())
+ bucketStats := item.GetBucketStats()
+ var bucketStatsList []*ofp.BucketCounter
+ for j := 0; j < len(bucketStats); j++ {
+ bucketStat := bucketStats[i]
+ var bucketCounter ofp.BucketCounter
+ bucketCounter.SetPacketCount(bucketStat.GetPacketCount())
+ bucketCounter.SetByteCount(bucketStat.GetByteCount())
+ bucketStatsList = append(bucketStatsList, &bucketCounter)
+ }
+ entry.SetBucketStats(bucketStatsList)
+ groupStatsEntries = append(groupStatsEntries, &entry)
+ }
+ response.SetEntries(groupStatsEntries)
+ return response, nil
+}
+func handleGroupStatsDescRequest(request *ofp.GroupDescStatsRequest, id common.ID) (*ofp.GroupDescStatsReply, error) {
+ response := ofp.NewGroupDescStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ client := *getGrpcClient()
+ reply, err := client.ListLogicalDeviceFlowGroups(context.Background(), &id)
+ if err != nil {
+ return nil, err
+ }
+ entries := reply.GetItems()
+ var groupDescStatsEntries []*ofp.GroupDescStatsEntry
+ for i := 0; i < len(entries); i++ {
+ item := entries[i].GetStats()
+ var groupDesc ofp.GroupDescStatsEntry
+ groupDesc.SetGroupId(item.GetGroupId())
+ /*
+ buckets := item.g
+ var bucketList []*ofp.Bucket
+ for j:=0;j<len(buckets);j++{
+
+ }
+
+ groupDesc.SetBuckets(bucketList)
+ */
+ groupDescStatsEntries = append(groupDescStatsEntries, &groupDesc)
+ }
+ response.SetEntries(groupDescStatsEntries)
+ return response, nil
+}
+func handleGroupFeatureStatsRequest(request *ofp.GroupFeaturesStatsRequest, id common.ID) (*ofp.GroupFeaturesStatsReply, error) {
+ response := ofp.NewGroupFeaturesStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+func handleMeterStatsRequest(request *ofp.MeterStatsRequest, id common.ID) (*ofp.MeterStatsReply, error) {
+ response := ofp.NewMeterStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+
+//statsReq := request.(*ofp.MeterConfigStatsRequest)
+func handleMeterConfigStatsRequest(request *ofp.MeterConfigStatsRequest, id common.ID) (*ofp.MeterConfigStatsReply, error) {
+ response := ofp.NewMeterConfigStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+
+//statsReq := request.(*ofp.TableFeaturesStatsRequest)
+func handleTableFeaturesStatsRequest(request *ofp.TableFeaturesStatsRequest, id common.ID) (*ofp.TableFeaturesStatsReply, error) {
+ response := ofp.NewTableFeaturesStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+func handleTableStatsRequest(request *ofp.TableStatsRequest, id common.ID) (*ofp.TableStatsReply, error) {
+ var tableStatsReply = ofp.NewTableStatsReply()
+ tableStatsReply.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ tableStatsReply.SetVersion(request.GetVersion())
+ tableStatsReply.SetXid(request.GetXid())
+ return tableStatsReply, nil
+}
+func handleQueueStatsRequest(request *ofp.QueueStatsRequest, id common.ID) (*ofp.QueueStatsReply, error) {
+ response := ofp.NewQueueStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+func handlePortStatsRequest(request *ofp.PortStatsRequest, id common.ID) (*ofp.PortStatsReply, error) {
+ log.Println("HERE")
+ response := ofp.NewPortStatsReply()
+ response.SetXid(request.GetXid())
+ response.SetVersion(request.GetVersion())
+ //response.SetFlags(ofp.flagsrequest.GetFlags())
+ client := *getGrpcClient()
+ reply, err := client.ListLogicalDevicePorts(context.Background(), &id)
+ //reply,err := client.GetLogicalDevicePort(context.Background(),&id)
+ if err != nil {
+ log.Printf("error calling ListDevicePorts %v", err)
+ return nil, err
+ }
+ js, _ := json.Marshal(reply)
+ log.Printf("PORT STATS REPLY %s", js)
+ ports := reply.GetItems()
+ var entries []*ofp.PortStatsEntry
+ if request.GetPortNo() == 0xffffffff { //all ports
+ for i := 0; i < len(ports); i++ {
+ port := ports[i]
+ entry := parsePortStats(port)
+ entries = append(entries, &entry)
+ }
+ } else { //find right port that is requested
+ for i := 0; i < len(ports); i++ {
+ if ports[i].GetOfpPortStats().GetPortNo() == uint32(request.GetPortNo()) {
+ entry := parsePortStats(ports[i])
+ entries = append(entries, &entry)
+ }
+ }
+ }
+ response.SetEntries(entries)
+ js, _ = json.Marshal(response)
+ log.Printf("handlePortStatsResponse %s", js)
+ return response, nil
+
+}
+func parsePortStats(port *pb.LogicalPort) ofp.PortStatsEntry {
+ stats := port.OfpPortStats
+ var entry ofp.PortStatsEntry
+ entry.SetPortNo(ofp.Port(stats.GetPortNo()))
+ entry.SetRxPackets(stats.GetRxPackets())
+ entry.SetTxPackets(stats.GetTxPackets())
+ entry.SetRxBytes(stats.GetRxBytes())
+ entry.SetTxBytes(stats.GetTxBytes())
+ entry.SetRxDropped(stats.GetRxDropped())
+ entry.SetTxDropped(stats.GetTxDropped())
+ entry.SetRxErrors(stats.GetRxErrors())
+ entry.SetTxErrors(stats.GetTxErrors())
+ entry.SetRxFrameErr(stats.GetRxFrameErr())
+ entry.SetRxOverErr(stats.GetRxOverErr())
+ entry.SetRxCrcErr(stats.GetRxCrcErr())
+ entry.SetCollisions(stats.GetCollisions())
+ entry.SetDurationSec(stats.GetDurationSec())
+ entry.SetDurationNsec(stats.GetDurationNsec())
+ return entry
+}
+func handlePortDescStatsRequest(request *ofp.PortDescStatsRequest, deviceId string) (*ofp.PortDescStatsReply, error) {
+ response := ofp.NewPortDescStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ var grpcClient = *getGrpcClient()
+ var id = common.ID{Id: deviceId}
+ logicalDevice, err := grpcClient.GetLogicalDevice(context.Background(), &id)
+ if err != nil {
+ return nil, err
+ }
+ ports := logicalDevice.GetPorts()
+ var entries []*ofp.PortDesc
+ for i := 0; i < len(ports); i++ {
+ var entry ofp.PortDesc
+ port := ports[i].GetOfpPort()
+ entry.SetPortNo(ofp.Port(port.GetPortNo()))
+
+ intArray := port.GetHwAddr()
+ var octets []byte
+ for i := 0; i < len(intArray); i++ {
+ octets = append(octets, byte(intArray[i]))
+ }
+ hwAddr := net.HardwareAddr(octets)
+ entry.SetHwAddr(hwAddr)
+ entry.SetName(PadString(port.GetName(), 16))
+ entry.SetConfig(ofp.PortConfig(port.GetConfig()))
+ entry.SetState(ofp.PortState(port.GetState()))
+ entry.SetCurr(ofp.PortFeatures(port.GetCurr()))
+ entry.SetAdvertised(ofp.PortFeatures(port.GetAdvertised()))
+ entry.SetSupported(ofp.PortFeatures(port.GetSupported()))
+ entry.SetPeer(ofp.PortFeatures(port.GetPeer()))
+ entry.SetCurrSpeed(port.GetCurrSpeed())
+ entry.SetMaxSpeed(port.GetMaxSpeed())
+
+ entries = append(entries, &entry)
+ }
+
+ response.SetEntries(entries)
+ //TODO call voltha and get port descriptions etc
+ return response, nil
+
+}
+func handleMeterFeatureStatsRequest(request *ofp.MeterFeaturesStatsRequest) (*ofp.MeterFeaturesStatsReply, error) {
+ response := ofp.NewMeterFeaturesStatsReply()
+ response.SetXid(request.GetXid())
+ response.SetVersion(request.GetVersion())
+ return response, nil
+}
+func handleExperimenterStatsRequest(request *ofp.ExperimenterStatsRequest, id common.ID) (*ofp.ExperimenterStatsReply, error) {
+ response := ofp.NewExperimenterStatsReply(request.GetExperimenter())
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}