WIP - Suggesting changes (take2)
This is not yet completed, still working on things. Eventually the plan
is to provide the following changes
- restructure repo to be more aligned with https://github.com/golang-standards/project-layout
- add k8s probes
- modifications (golang range loops, etc) to follow some golang
practices
Change-Id: I6922cbc00b5ef17ceab183aba00a7fc59ab46480
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
new file mode 100644
index 0000000..90b0fd6
--- /dev/null
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -0,0 +1,110 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/ofagent-go/internal/pkg/openflow"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "google.golang.org/grpc"
+ "net"
+)
+
+func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
+ logger.Debug("receive-change-events-started")
+ opt := grpc.EmptyCallOption{}
+ streamCtx, streamDone := context.WithCancel(context.Background())
+ stream, err := ofa.volthaClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
+ if err != nil {
+ logger.Errorw("Unable to establish Receive Change Event Stream",
+ log.Fields{"error": err})
+ ofa.events <- ofaEventVolthaDisconnected
+ }
+ defer streamDone()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if ce, err := stream.Recv(); err != nil {
+ logger.Errorw("error receiving change event",
+ log.Fields{"error": err})
+ ofa.events <- ofaEventVolthaDisconnected
+ } else {
+ ofa.changeEventChannel <- ce
+ }
+ }
+ }
+}
+
+func (ofa *OFAgent) handleChangeEvents(ctx context.Context) {
+ logger.Debugln("handle-change-event-started")
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case changeEvent := <-ofa.changeEventChannel:
+ deviceID := changeEvent.GetId()
+ portStatus := changeEvent.GetPortStatus()
+ logger.Debugw("received-change-event",
+ log.Fields{
+ "device-id": deviceID,
+ "port-status": portStatus})
+
+ if portStatus == nil {
+ if logger.V(log.WarnLevel) {
+ js, _ := json.Marshal(changeEvent.GetEvent())
+ logger.Warnw("Received change event that was not port status",
+ log.Fields{"ChangeEvent": js})
+ }
+ break
+ }
+ ofPortStatus := ofp.NewPortStatus()
+ ofPortStatus.SetXid(openflow.GetXid())
+ ofPortStatus.SetVersion(4)
+
+ ofReason := ofp.PortReason(portStatus.GetReason())
+ ofPortStatus.SetReason(ofReason)
+ ofDesc := ofp.NewPortDesc()
+
+ desc := portStatus.GetDesc()
+ ofDesc.SetAdvertised(ofp.PortFeatures(desc.GetAdvertised()))
+ ofDesc.SetConfig(ofp.PortConfig(0))
+ ofDesc.SetCurr(ofp.PortFeatures(desc.GetAdvertised()))
+ ofDesc.SetCurrSpeed(desc.GetCurrSpeed())
+ intArray := desc.GetHwAddr()
+ var octets []byte
+ for _, val := range intArray {
+ octets = append(octets, byte(val))
+ }
+ addr := net.HardwareAddr(octets)
+ ofDesc.SetHwAddr(addr)
+ ofDesc.SetMaxSpeed(desc.GetMaxSpeed())
+ ofDesc.SetName(openflow.PadString(desc.GetName(), 16))
+ ofDesc.SetPeer(ofp.PortFeatures(desc.GetPeer()))
+ ofDesc.SetPortNo(ofp.Port(desc.GetPortNo()))
+ ofDesc.SetState(ofp.PortState(desc.GetState()))
+ ofDesc.SetSupported(ofp.PortFeatures(desc.GetSupported()))
+ ofPortStatus.SetDesc(*ofDesc)
+ ofa.getOFClient(deviceID).SendMessage(ofPortStatus)
+ }
+ }
+}
diff --git a/internal/pkg/ofagent/connection.go b/internal/pkg/ofagent/connection.go
new file mode 100644
index 0000000..5fd8a3f
--- /dev/null
+++ b/internal/pkg/ofagent/connection.go
@@ -0,0 +1,77 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package ofagent
+
+import (
+ "context"
+ "errors"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/probe"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "google.golang.org/grpc"
+ "time"
+)
+
+func (ofa *OFAgent) establishConnectionToVoltha(p *probe.Probe) error {
+ if p != nil {
+ p.UpdateStatus("voltha", probe.ServiceStatusPreparing)
+ }
+
+ if ofa.volthaConnection != nil {
+ ofa.volthaConnection.Close()
+ }
+
+ ofa.volthaConnection = nil
+ ofa.volthaClient = nil
+ try := 1
+ for ofa.ConnectionMaxRetries == 0 || try < ofa.ConnectionMaxRetries {
+ conn, err := grpc.Dial(ofa.VolthaApiEndPoint, grpc.WithInsecure())
+ if err == nil {
+ svc := voltha.NewVolthaServiceClient(conn)
+ if svc != nil {
+ if _, err = svc.GetVoltha(context.Background(), &empty.Empty{}); err == nil {
+ logger.Debugw("Established connection to Voltha",
+ log.Fields{
+ "VolthaApiEndPoint": ofa.VolthaApiEndPoint,
+ })
+ ofa.volthaConnection = conn
+ ofa.volthaClient = svc
+ if p != nil {
+ p.UpdateStatus("voltha", probe.ServiceStatusRunning)
+ }
+ ofa.events <- ofaEventVolthaConnected
+ return nil
+ }
+ }
+ }
+ logger.Warnw("Failed to connect to voltha",
+ log.Fields{
+ "VolthaApiEndPoint": ofa.VolthaApiEndPoint,
+ "error": err.Error(),
+ })
+ if ofa.ConnectionMaxRetries == 0 || try < ofa.ConnectionMaxRetries {
+ if ofa.ConnectionMaxRetries != 0 {
+ try += 1
+ }
+ time.Sleep(ofa.ConnectionRetryDelay)
+ }
+ }
+ if p != nil {
+ p.UpdateStatus("voltha", probe.ServiceStatusFailed)
+ }
+ return errors.New("failed-to-connect-to-voltha")
+}
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
new file mode 100644
index 0000000..0ad4cd1
--- /dev/null
+++ b/internal/pkg/ofagent/ofagent.go
@@ -0,0 +1,181 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "github.com/opencord/ofagent-go/internal/pkg/openflow"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/probe"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "google.golang.org/grpc"
+ "sync"
+ "time"
+)
+
+var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+
+type ofaEvent byte
+type ofaState byte
+
+const (
+ ofaEventStart = ofaEvent(iota)
+ ofaEventVolthaConnected
+ ofaEventVolthaDisconnected
+ ofaEventError
+
+ ofaStateConnected = ofaState(iota)
+ ofaStateDisconnected
+)
+
+type OFAgent struct {
+ VolthaApiEndPoint string
+ OFControllerEndPoint string
+ DeviceListRefreshInterval time.Duration
+ ConnectionMaxRetries int
+ ConnectionRetryDelay time.Duration
+
+ volthaConnection *grpc.ClientConn
+ volthaClient voltha.VolthaServiceClient
+ mapLock sync.Mutex
+ clientMap map[string]*openflow.OFClient
+ events chan ofaEvent
+
+ packetInChannel chan *voltha.PacketIn
+ packetOutChannel chan *voltha.PacketOut
+ changeEventChannel chan *voltha.ChangeEvent
+}
+
+func NewOFAgent(config *OFAgent) (*OFAgent, error) {
+ ofa := OFAgent{
+ VolthaApiEndPoint: config.VolthaApiEndPoint,
+ OFControllerEndPoint: config.OFControllerEndPoint,
+ DeviceListRefreshInterval: config.DeviceListRefreshInterval,
+ ConnectionMaxRetries: config.ConnectionMaxRetries,
+ ConnectionRetryDelay: config.ConnectionRetryDelay,
+ packetInChannel: make(chan *voltha.PacketIn),
+ packetOutChannel: make(chan *voltha.PacketOut),
+ changeEventChannel: make(chan *voltha.ChangeEvent),
+ clientMap: make(map[string]*openflow.OFClient),
+ events: make(chan ofaEvent, 100),
+ }
+
+ if ofa.DeviceListRefreshInterval <= 0 {
+ logger.Warnw("device list refresh internal not valid, setting to default",
+ log.Fields{
+ "value": ofa.DeviceListRefreshInterval.String(),
+ "default": (1 * time.Minute).String()})
+ ofa.DeviceListRefreshInterval = 1 * time.Minute
+ }
+
+ if ofa.ConnectionRetryDelay <= 0 {
+ logger.Warnw("connection retry delay not value, setting to default",
+ log.Fields{
+ "value": ofa.ConnectionRetryDelay.String(),
+ "default": (3 * time.Second).String()})
+ ofa.ConnectionRetryDelay = 3 * time.Second
+ }
+
+ return &ofa, nil
+}
+
+// Run - make the inital connection to voltha and kicks off io streams
+func (ofa *OFAgent) Run(ctx context.Context) {
+
+ logger.Debugw("Starting GRPC - VOLTHA client",
+ log.Fields{
+ "voltha-endpoint": ofa.VolthaApiEndPoint,
+ "controller-endpoint": ofa.OFControllerEndPoint})
+
+ // If the context contains a k8s probe then register services
+ p := probe.GetProbeFromContext(ctx)
+ if p != nil {
+ p.RegisterService("voltha")
+ }
+ ofa.events <- ofaEventStart
+
+ /*
+ * Two sub-contexts are created here for different purposes so we can
+ * control the lifecyle of processing loops differently.
+ *
+ * volthaCtx - controls those processes that rely on the GRPC
+ * GRPCconnection to voltha and will be restarted when the
+ * GRPC connection is interrupted.
+ * hdlCtx - controls those processes that listen to channels and
+ * process each message. these will likely never be
+ * stopped until the ofagent is stopped.
+ */
+ var volthaCtx, hdlCtx context.Context
+ var volthaDone, hdlDone func()
+ state := ofaStateDisconnected
+
+ for {
+ select {
+ case <-ctx.Done():
+ if volthaDone != nil {
+ volthaDone()
+ volthaDone = nil
+ }
+ if hdlDone != nil {
+ hdlDone()
+ hdlDone = nil
+ }
+ return
+ case event := <-ofa.events:
+ switch event {
+ case ofaEventStart:
+ logger.Debug("ofagent-voltha-start-event")
+
+ // Start the loops that process messages
+ hdlCtx, hdlDone = context.WithCancel(context.Background())
+ go ofa.handlePacketsIn(hdlCtx)
+ go ofa.handleChangeEvents(hdlCtx)
+
+ // Kick off process to attempt to establish
+ // connection to voltha
+ go ofa.establishConnectionToVoltha(p)
+
+ case ofaEventVolthaConnected:
+ logger.Debug("ofagent-voltha-connect-event")
+
+ // Start the loops that poll from voltha
+ if state != ofaStateConnected {
+ state = ofaStateConnected
+ volthaCtx, volthaDone = context.WithCancel(context.Background())
+ go ofa.receiveChangeEvents(volthaCtx)
+ go ofa.receivePacketsIn(volthaCtx)
+ go ofa.streamPacketOut(volthaCtx)
+ go ofa.synchronizeDeviceList(volthaCtx)
+ }
+
+ case ofaEventVolthaDisconnected:
+ logger.Debug("ofagent-voltha-disconnect-event")
+ if state == ofaStateConnected {
+ state = ofaStateDisconnected
+ volthaDone()
+ volthaDone = nil
+ volthaCtx = nil
+ }
+ case ofaEventError:
+ logger.Debug("ofagent-error-event")
+ default:
+ logger.Fatalw("ofagent-unknown-event",
+ log.Fields{"event": event})
+ }
+ }
+ }
+}
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
new file mode 100644
index 0000000..721d1d7
--- /dev/null
+++ b/internal/pkg/ofagent/packetIn.go
@@ -0,0 +1,160 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "encoding/json"
+ "github.com/donNewtonAlpha/goloxi"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/ofagent-go/internal/pkg/openflow"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/openflow_13"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "google.golang.org/grpc"
+)
+
+func (ofa *OFAgent) receivePacketsIn(ctx context.Context) {
+ logger.Debug("receive-packets-in-started")
+ opt := grpc.EmptyCallOption{}
+ streamCtx, streamDone := context.WithCancel(context.Background())
+ stream, err := ofa.volthaClient.ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
+ if err != nil {
+ logger.Errorw("Unable to establish Receive PacketIn Stream",
+ log.Fields{"error": err})
+ }
+ defer streamDone()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if pkt, err := stream.Recv(); err != nil {
+ logger.Errorw("error receiving packet",
+ log.Fields{"error": err})
+ ofa.events <- ofaEventVolthaDisconnected
+ } else {
+ ofa.packetInChannel <- pkt
+ }
+ }
+ }
+}
+
+func (ofa *OFAgent) handlePacketsIn(ctx context.Context) {
+ logger.Debug("handle-packets-in-started")
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case packet := <-ofa.packetInChannel:
+ packetIn := packet.GetPacketIn()
+
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(packetIn)
+ logger.Debugw("packet-in recieved", log.Fields{"packet-in": js})
+ }
+ deviceID := packet.GetId()
+ ofPacketIn := ofp.NewPacketIn()
+ ofPacketIn.SetVersion(uint8(4))
+ ofPacketIn.SetXid(openflow.GetXid())
+ ofPacketIn.SetBufferId(packetIn.GetBufferId())
+ ofPacketIn.SetCookie(packetIn.GetCookie())
+ ofPacketIn.SetData(packetIn.GetData())
+ match := ofp.NewMatchV3()
+ inMatch := packetIn.GetMatch()
+ match.SetType(uint16(inMatch.GetType()))
+ //oxFields := inMatch.GetOxmFields()
+ var fields []goloxi.IOxm
+ size := uint16(4)
+ for _, oxmField := range inMatch.GetOxmFields() {
+ /*
+ for i := 0; i < len(oxFields); i++ {
+ oxmField := oxFields[i]
+ */
+ field := oxmField.GetField()
+ ofbField := field.(*openflow_13.OfpOxmField_OfbField).OfbField
+ size += 4 //header for oxm
+ switch ofbField.Type {
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT:
+ ofpInPort := ofp.NewOxmInPort()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_Port)
+ ofpInPort.Value = ofp.Port(val.Port)
+ size += 4
+ fields = append(fields, ofpInPort)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE:
+ ofpEthType := ofp.NewOxmEthType()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_EthType)
+ ofpEthType.Value = ofp.EthernetType(val.EthType)
+ size += 2
+ fields = append(fields, ofpEthType)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT:
+ ofpInPhyPort := ofp.NewOxmInPhyPort()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_PhysicalPort)
+ ofpInPhyPort.Value = ofp.Port(val.PhysicalPort)
+ size += 4
+ fields = append(fields, ofpInPhyPort)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
+ ofpIpProto := ofp.NewOxmIpProto()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_IpProto)
+ ofpIpProto.Value = ofp.IpPrototype(val.IpProto)
+ size += 1
+ fields = append(fields, ofpIpProto)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
+ ofpUdpSrc := ofp.NewOxmUdpSrc()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpSrc)
+ ofpUdpSrc.Value = uint16(val.UdpSrc)
+ size += 2
+ fields = append(fields, ofpUdpSrc)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
+ ofpUdpDst := ofp.NewOxmUdpDst()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpDst)
+ ofpUdpDst.Value = uint16(val.UdpDst)
+ size += 2
+ fields = append(fields, ofpUdpDst)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
+ ofpVlanVid := ofp.NewOxmVlanVid()
+ val := ofbField.GetValue()
+ if val != nil {
+ vlanId := val.(*openflow_13.OfpOxmOfbField_VlanVid)
+ ofpVlanVid.Value = uint16(vlanId.VlanVid) + 0x1000
+ size += 2
+ } else {
+ ofpVlanVid.Value = uint16(0)
+ }
+
+ fields = append(fields, ofpVlanVid)
+ default:
+ logger.Warnw("receive-packet-in:unhandled-oxm-field",
+ log.Fields{"field": ofbField.Type})
+ }
+ }
+ match.SetLength(size)
+
+ match.SetOxmList(fields)
+
+ ofPacketIn.SetMatch(*match)
+ ofPacketIn.SetReason(uint8(packetIn.GetReason()))
+ ofPacketIn.SetTableId(uint8(packetIn.GetTableId()))
+ ofPacketIn.SetTotalLen(uint16(len(ofPacketIn.GetData())))
+ ofc := ofa.getOFClient(deviceID)
+ ofc.SendMessage(ofPacketIn)
+
+ }
+ }
+}
diff --git a/internal/pkg/ofagent/packetOut.go b/internal/pkg/ofagent/packetOut.go
new file mode 100644
index 0000000..3a43335
--- /dev/null
+++ b/internal/pkg/ofagent/packetOut.go
@@ -0,0 +1,50 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "encoding/json"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "google.golang.org/grpc"
+)
+
+func (ofa *OFAgent) streamPacketOut(ctx context.Context) {
+ if logger.V(log.DebugLevel) {
+ logger.Debug("GrpcClient streamPacketOut called")
+ }
+ opt := grpc.EmptyCallOption{}
+ streamCtx, streamDone := context.WithCancel(context.Background())
+ outClient, err := ofa.volthaClient.StreamPacketsOut(streamCtx, opt)
+ defer streamDone()
+ if err != nil {
+ logger.Errorw("streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
+ ofa.events <- ofaEventVolthaDisconnected
+ }
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case ofPacketOut := <-ofa.packetOutChannel:
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(ofPacketOut)
+ logger.Debugw("streamPacketOut Receive PacketOut from Channel", log.Fields{"PacketOut": js})
+ }
+ outClient.Send(ofPacketOut)
+ }
+ }
+}
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
new file mode 100644
index 0000000..8f68ef0
--- /dev/null
+++ b/internal/pkg/ofagent/refresh.go
@@ -0,0 +1,109 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/ofagent-go/internal/pkg/openflow"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "time"
+)
+
+func (ofa *OFAgent) synchronizeDeviceList(ctx context.Context) {
+ // Refresh once to get everything started
+ ofa.refreshDeviceList()
+
+ tick := time.NewTicker(ofa.DeviceListRefreshInterval)
+loop:
+ for {
+ select {
+ case <-ctx.Done():
+ break loop
+ case <-tick.C:
+ ofa.refreshDeviceList()
+ }
+ }
+ tick.Stop()
+}
+
+func (ofa *OFAgent) refreshDeviceList() {
+ deviceList, err := ofa.volthaClient.ListLogicalDevices(context.Background(), &empty.Empty{})
+ if err != nil {
+ logger.Errorw("ofagent failed to query device list from voltha",
+ log.Fields{"error": err})
+ return
+ }
+ devices := deviceList.GetItems()
+
+ var toAdd []string
+ var toDel []string
+ var deviceIDMap = make(map[string]string)
+ for i := 0; i < len(devices); i++ {
+ deviceID := devices[i].GetId()
+ deviceIDMap[deviceID] = deviceID
+ if ofa.clientMap[deviceID] == nil {
+ toAdd = append(toAdd, deviceID)
+ }
+ }
+ for key := range ofa.clientMap {
+ if deviceIDMap[key] == "" {
+ toDel = append(toDel, key)
+ }
+ }
+ logger.Debugw("GrpcClient refreshDeviceList", log.Fields{"ToAdd": toAdd, "ToDel": toDel})
+ for i := 0; i < len(toAdd); i++ {
+ var client = ofa.addOFClient(toAdd[i])
+ go client.Run(context.Background())
+ }
+ for i := 0; i < len(toDel); i++ {
+ ofa.clientMap[toDel[i]].Stop()
+ ofa.mapLock.Lock()
+ delete(ofa.clientMap, toDel[i])
+ ofa.mapLock.Unlock()
+ }
+}
+
+func (ofa *OFAgent) addOFClient(deviceID string) *openflow.OFClient {
+ logger.Debugw("GrpcClient addClient called ", log.Fields{"device-id": deviceID})
+ ofa.mapLock.Lock()
+ ofc := ofa.clientMap[deviceID]
+ if ofc == nil {
+ ofc = openflow.NewOFClient(&openflow.OFClient{
+ DeviceID: deviceID,
+ OFControllerEndPoint: ofa.OFControllerEndPoint,
+ VolthaClient: ofa.volthaClient,
+ PacketOutChannel: ofa.packetOutChannel,
+ ConnectionMaxRetries: ofa.ConnectionMaxRetries,
+ ConnectionRetryDelay: ofa.ConnectionRetryDelay,
+ KeepRunning: true,
+ })
+ go ofc.Run(context.Background())
+ ofa.clientMap[deviceID] = ofc
+ }
+ ofa.mapLock.Unlock()
+ logger.Debugw("Finished with addClient", log.Fields{"deviceID": deviceID})
+ return ofc
+}
+
+func (ofa *OFAgent) getOFClient(deviceID string) *openflow.OFClient {
+ ofc := ofa.clientMap[deviceID]
+ if ofc == nil {
+ ofc = ofa.addOFClient(deviceID)
+ }
+ return ofc
+}
diff --git a/internal/pkg/openflow/barrier.go b/internal/pkg/openflow/barrier.go
new file mode 100644
index 0000000..d6af20a
--- /dev/null
+++ b/internal/pkg/openflow/barrier.go
@@ -0,0 +1,38 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+func (ofc *OFClient) handleBarrierRequest(request *ofp.BarrierRequest) {
+
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(request)
+ logger.Debugw("handleBarrierRequest called with %s",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": js})
+ }
+ reply := ofp.NewBarrierReply()
+ reply.SetVersion(4)
+ reply.SetXid(request.GetXid())
+ ofc.SendMessage(reply)
+}
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
new file mode 100644
index 0000000..33d531d
--- /dev/null
+++ b/internal/pkg/openflow/client.go
@@ -0,0 +1,538 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "github.com/donNewtonAlpha/goloxi"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "io"
+ "net"
+ "time"
+)
+
+var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+
+type ofcEvent byte
+type ofcState byte
+
+const (
+ ofcEventStart = ofcEvent(iota)
+ ofcEventConnected
+ ofcEventDisconnected
+
+ ofcStateConnected = ofcState(iota)
+ ofcStateDisconnected
+)
+
+//Client structure to hold fields of Openflow Client
+type OFClient struct {
+ OFControllerEndPoint string
+ Port uint16
+ DeviceID string
+ KeepRunning bool
+ VolthaClient voltha.VolthaServiceClient
+ PacketOutChannel chan *voltha.PacketOut
+ ConnectionMaxRetries int
+ ConnectionRetryDelay time.Duration
+ conn net.Conn
+
+ // expirimental
+ events chan ofcEvent
+ sendChannel chan Message
+ lastUnsentMessage Message
+}
+
+//NewClient contstructs a new Openflow Client and then starts up
+func NewOFClient(config *OFClient) *OFClient {
+
+ ofc := OFClient{
+ DeviceID: config.DeviceID,
+ OFControllerEndPoint: config.OFControllerEndPoint,
+ VolthaClient: config.VolthaClient,
+ PacketOutChannel: config.PacketOutChannel,
+ KeepRunning: config.KeepRunning,
+ ConnectionMaxRetries: config.ConnectionMaxRetries,
+ ConnectionRetryDelay: config.ConnectionRetryDelay,
+ events: make(chan ofcEvent, 10),
+ sendChannel: make(chan Message, 100),
+ }
+
+ if ofc.ConnectionRetryDelay <= 0 {
+ logger.Warnw("connection retry delay not valid, setting to default",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "value": ofc.ConnectionRetryDelay.String(),
+ "default": (3 * time.Second).String()})
+ ofc.ConnectionRetryDelay = 3 * time.Second
+ }
+ return &ofc
+}
+
+//End - set keepRunning to false so start loop exits
+func (ofc *OFClient) Stop() {
+ ofc.KeepRunning = false
+}
+
+func (ofc *OFClient) peekAtOFHeader(buf []byte) (ofp.IHeader, error) {
+ header := ofp.Header{}
+ header.Version = uint8(buf[0])
+ header.Type = uint8(buf[1])
+ header.Length = binary.BigEndian.Uint16(buf[2:4])
+ header.Xid = binary.BigEndian.Uint32(buf[4:8])
+
+ // TODO: add minimal validation of version and type
+
+ return &header, nil
+}
+
+func (ofc *OFClient) establishConnectionToController() error {
+ if ofc.conn != nil {
+ ofc.conn.Close()
+ ofc.conn = nil
+ }
+ try := 1
+ for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
+ if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
+ logger.Debugw("openflow-client unable to resolve endpoint",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "endpoint": ofc.OFControllerEndPoint})
+ } else {
+ if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
+ ofc.conn = connection
+ ofc.sayHello()
+ ofc.events <- ofcEventConnected
+ return nil
+ } else {
+ logger.Warnw("openflow-client-connect-error",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "endpoint": ofc.OFControllerEndPoint})
+ }
+ }
+ if ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
+ if ofc.ConnectionMaxRetries != 0 {
+ try += 1
+ }
+ time.Sleep(ofc.ConnectionRetryDelay)
+ }
+ }
+ return errors.New("failed-to-connect-to-of-controller")
+}
+
+func (ofc *OFClient) Run(ctx context.Context) {
+
+ var ofCtx context.Context
+ var ofDone func()
+ ofc.events <- ofcEventStart
+ state := ofcStateDisconnected
+top:
+ for {
+ select {
+ case <-ctx.Done():
+ break top
+ case event := <-ofc.events:
+ switch event {
+ case ofcEventStart:
+ logger.Debugw("ofc-event-star",
+ log.Fields{"device-id": ofc.DeviceID})
+ go ofc.establishConnectionToController()
+ case ofcEventConnected:
+ if state == ofcStateDisconnected {
+ state = ofcStateConnected
+ logger.Debugw("ofc-event-connected",
+ log.Fields{"device-id": ofc.DeviceID})
+ ofCtx, ofDone = context.WithCancel(context.Background())
+ go ofc.messageSender(ofCtx)
+ go ofc.processOFStream(ofCtx)
+ }
+ case ofcEventDisconnected:
+ if state == ofcStateConnected {
+ state = ofcStateDisconnected
+ logger.Debugw("ofc-event-disconnected",
+ log.Fields{"device-id": ofc.DeviceID})
+ if ofDone != nil {
+ ofDone()
+ ofDone = nil
+ }
+ go ofc.establishConnectionToController()
+ }
+ }
+ }
+ }
+
+ if ofDone != nil {
+ ofDone()
+ ofDone = nil
+ }
+
+}
+
+// Run run loop for the openflow client
+func (ofc *OFClient) processOFStream(ctx context.Context) {
+ buf := make([]byte, 1500)
+ var need, have int
+ /*
+ * EXPLANATION
+ *
+ * The below loops reuses a byte array to read messages from the TCP
+ * connection to the OF controller. It reads messages into a large
+ * buffer in an attempt to optimize the read performance from the
+ * TCP connection. This means that on any given read there may be more
+ * than a single message in the byte array read.
+ *
+ * As the minimal size for an OF message is 8 bytes (because that is
+ * the size of the basic header) we know that if we have not read
+ * 8 bytes we need to read more before we can process a message.
+ *
+ * Once the mninium header is read, the complete length of the
+ * message is retrieved from the header and bytes are repeatedly read
+ * until we know the byte array contains at least one message.
+ *
+ * Once it is known that the buffer has at least a single message
+ * a slice (msg) is moved through the read bytes looking to process
+ * each message util the length of read data is < the length required
+ * i.e., the minimum size or the size of the next message.
+ *
+ * When no more message can be proessed from the byte array any unused
+ * bytes are moved to the front of the source array and more data is
+ * read from the TCP connection.
+ */
+
+ /*
+ * First thing we are looking for is an openflow header, so we need at
+ * least 8 bytes
+ */
+ need = 8
+
+top:
+ // Continue until we are told to stop
+ for ofc.KeepRunning {
+ logger.Debugw("before-read-from-controller",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "have": have,
+ "need": need,
+ "buf-length": len(buf[have:])})
+ read, err := ofc.conn.Read(buf[have:])
+ have += read
+ logger.Debugw("read-from-controller",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "byte-count": read,
+ "error": err})
+
+ /*
+ * If we have less than we need and there is no
+ * error, then continue to attempt to read more data
+ */
+ if have < need && err == nil {
+ // No bytes available, just continue
+ logger.Debugw("continue-to-read",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "have": have,
+ "need": need,
+ "error": err})
+ continue
+ }
+
+ /*
+ * Single out EOF here, because if we have bytes
+ * but have an EOF we still want to process the
+ * the last meesage. A read of 0 bytes and EOF is
+ * a terminated connection.
+ */
+ if err != nil && (err != io.EOF || read == 0) {
+ logger.Errorw("voltha-connection-dead",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ break
+ }
+
+ /*
+ * We should have at least 1 message at this point so
+ * create a slice (msg) that points to the start of the
+ * buffer
+ */
+ msg := buf[0:]
+ for need <= have {
+ logger.Debugw("process-of-message-stream",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "have": have,
+ "need": need})
+ /*
+ * If we get here, we have at least the 8 bytes of the
+ * header, if not enough for the complete message. So
+ * take a peek at the OF header to do simple validation
+ * and be able to get the full expected length of the
+ * packet
+ */
+ peek, err := ofc.peekAtOFHeader(msg)
+ if err != nil {
+ /*
+ * Header is bad, assume stream is corrupted
+ * and needs to be restarted
+ */
+ logger.Errorw("bad-of-packet",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ break top
+ }
+
+ /*
+ * If we don't have the full packet, then back around
+ * the outer loop to get more bytes
+ */
+ need = int(peek.GetLength())
+
+ logger.Debugw("processed-header-need-message",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "have": have,
+ "need": need})
+
+ if have < need {
+ logger.Debugw("end-processing:continue-to-read",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "have": have,
+ "need": need})
+ break
+ }
+
+ // Decode and process the packet
+ decoder := goloxi.NewDecoder(msg)
+ header, err := ofp.DecodeHeader(decoder)
+ if err != nil {
+ js, _ := json.Marshal(decoder)
+ logger.Errorw("failed-to-decode",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "decoder": js,
+ "error": err})
+ break top
+ }
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(header)
+ logger.Debugw("packet-header",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "header": js})
+ }
+ ofc.parseHeader(header)
+
+ /*
+ * Move the msg slice to the start of the next
+ * message, which is the current message plus the
+ * used bytes (need)
+ */
+ msg = msg[need:]
+ have -= need
+
+ // Finished process method, need header again
+ need = 8
+
+ logger.Debugw("message-process-complete",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "have": have,
+ "need": need,
+ "read-length": len(buf[have:])})
+ }
+ /*
+ * If we have any left over bytes move them to the front
+ * of the byte array to be appended to bny the next read
+ */
+ if have > 0 {
+ copy(buf, msg)
+ }
+ }
+ ofc.events <- ofcEventDisconnected
+}
+
+func (ofc *OFClient) sayHello() {
+ hello := ofp.NewHello()
+ hello.Xid = uint32(GetXid())
+ elem := ofp.NewHelloElemVersionbitmap()
+ elem.SetType(ofp.OFPHETVersionbitmap)
+ elem.SetLength(8)
+ elem.SetBitmaps([]*ofp.Uint32{&ofp.Uint32{Value: 16}})
+ hello.SetElements([]ofp.IHelloElem{elem})
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(hello)
+ logger.Debugw("sayHello Called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "hello-message": js})
+ }
+ if err := ofc.SendMessage(hello); err != nil {
+ logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ }
+}
+
+func (ofc *OFClient) parseHeader(header ofp.IHeader) {
+ switch header.GetType() {
+ case ofp.OFPTHello:
+ //x := header.(*ofp.Hello)
+ case ofp.OFPTError:
+ go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
+ case ofp.OFPTEchoRequest:
+ go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
+ case ofp.OFPTEchoReply:
+ case ofp.OFPTExperimenter:
+ case ofp.OFPTFeaturesRequest:
+ go ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest))
+ case ofp.OFPTFeaturesReply:
+ case ofp.OFPTGetConfigRequest:
+ go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
+ case ofp.OFPTGetConfigReply:
+ case ofp.OFPTSetConfig:
+ go ofc.handleSetConfig(header.(*ofp.SetConfig))
+ case ofp.OFPTPacketIn:
+ case ofp.OFPTFlowRemoved:
+ case ofp.OFPTPortStatus:
+ case ofp.OFPTPacketOut:
+ go ofc.handlePacketOut(header.(*ofp.PacketOut))
+ case ofp.OFPTFlowMod:
+ /*
+ * Not using go routine to handle flow* messages or barrier requests
+ * onos typically issues barrier requests just before a flow* message.
+ * by handling in this thread I ensure all flow* are handled when barrier
+ * request is issued.
+ */
+ switch header.(ofp.IFlowMod).GetCommand() {
+ case ofp.OFPFCAdd:
+ ofc.handleFlowAdd(header.(*ofp.FlowAdd))
+ case ofp.OFPFCModify:
+ ofc.handleFlowMod(header.(*ofp.FlowMod))
+ case ofp.OFPFCModifyStrict:
+ ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
+ case ofp.OFPFCDelete:
+ ofc.handleFlowDelete(header.(*ofp.FlowDelete))
+ case ofp.OFPFCDeleteStrict:
+ ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
+ }
+ case ofp.OFPTStatsRequest:
+ go ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType())
+ case ofp.OFPTBarrierRequest:
+ /* See note above at case ofp.OFPTFlowMod:*/
+ ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
+ case ofp.OFPTRoleRequest:
+ go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
+ case ofp.OFPTMeterMod:
+ go ofc.handleMeterModRequest(header.(*ofp.MeterMod))
+ }
+}
+
+//Message created to allow for a single SendMessage
+type Message interface {
+ Serialize(encoder *goloxi.Encoder) error
+}
+
+func (ofc *OFClient) doSend(msg Message) error {
+ if ofc.conn == nil {
+ return errors.New("no-connection")
+ }
+ enc := goloxi.NewEncoder()
+ msg.Serialize(enc)
+ bytes := enc.Bytes()
+ if _, err := ofc.conn.Write(bytes); err != nil {
+ logger.Warnw("unable-to-send-message-to-controller",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "message": msg,
+ "error": err})
+ return err
+ }
+ return nil
+}
+
+func (ofc *OFClient) messageSender(ctx context.Context) {
+
+ // first process last fail if it exists
+ if ofc.lastUnsentMessage != nil {
+ if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
+ ofc.events <- ofcEventDisconnected
+ return
+ }
+ ofc.lastUnsentMessage = nil
+ }
+top:
+ for {
+ select {
+ case <-ctx.Done():
+ break top
+ case msg := <-ofc.sendChannel:
+ if ofc.doSend(msg) != nil {
+ ofc.lastUnsentMessage = msg
+ ofc.events <- ofcEventDisconnected
+ return
+ }
+ ofc.lastUnsentMessage = nil
+ }
+ }
+}
+
+func (ofc *OFClient) SendMessage(message Message) error {
+ ofc.sendChannel <- message
+ return nil
+}
+
+//SendMessage sends message to openflow server
+func (ofc *OFClient) SendMessageOrig(message Message) error {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(message)
+ logger.Debugw("SendMessage called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "message": js})
+ }
+ enc := goloxi.NewEncoder()
+ message.Serialize(enc)
+ for {
+ if ofc.conn == nil {
+ logger.Warnln("SendMessage Connection is Nil sleeping for 10 milliseconds")
+ time.Sleep(10 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+ bytes := enc.Bytes()
+ if _, err := ofc.conn.Write(bytes); err != nil {
+ jMessage, _ := json.Marshal(message)
+ logger.Errorw("SendMessage failed sending message",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err,
+ "message": jMessage})
+ return err
+ }
+ return nil
+}
diff --git a/internal/pkg/openflow/echo.go b/internal/pkg/openflow/echo.go
new file mode 100644
index 0000000..d773c70
--- /dev/null
+++ b/internal/pkg/openflow/echo.go
@@ -0,0 +1,37 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+func (ofc *OFClient) handleEchoRequest(request *ofp.EchoRequest) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(request)
+ logger.Debugw("handleEchoRequest called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": js})
+ }
+ reply := ofp.NewEchoReply()
+ reply.SetXid(request.GetXid())
+ reply.SetVersion(request.GetVersion())
+ ofc.SendMessage(reply)
+}
diff --git a/internal/pkg/openflow/error.go b/internal/pkg/openflow/error.go
new file mode 100644
index 0000000..c1679b6
--- /dev/null
+++ b/internal/pkg/openflow/error.go
@@ -0,0 +1,34 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+func (ofc *OFClient) handleErrMsg(message ofp.IErrorMsg) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(message)
+ logger.Debugw("handleErrMsg called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": js})
+ }
+ //Not sure yet what if anything to do here
+}
diff --git a/internal/pkg/openflow/feature.go b/internal/pkg/openflow/feature.go
new file mode 100644
index 0000000..706a84c
--- /dev/null
+++ b/internal/pkg/openflow/feature.go
@@ -0,0 +1,57 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "context"
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/common"
+)
+
+func (ofc *OFClient) handleFeatureRequest(request *ofp.FeaturesRequest) error {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(request)
+ logger.Debugw("handleFeatureRequest called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": js})
+ }
+ var id = common.ID{Id: ofc.DeviceID}
+ logicalDevice, err := ofc.VolthaClient.GetLogicalDevice(context.Background(), &id)
+ reply := ofp.NewFeaturesReply()
+ reply.SetVersion(4)
+ reply.SetXid(request.GetXid())
+ features := logicalDevice.GetSwitchFeatures()
+ reply.SetDatapathId(logicalDevice.GetDatapathId())
+ reply.SetNBuffers(features.GetNBuffers())
+ reply.SetNTables(uint8(features.GetNTables()))
+ reply.SetAuxiliaryId(uint8(features.GetAuxiliaryId()))
+ capabilities := features.GetCapabilities()
+ reply.SetCapabilities(ofp.Capabilities(capabilities))
+
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(reply)
+ logger.Debugw("handleFeatureRequestReturn",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "reply": js})
+ }
+ err = ofc.SendMessage(reply)
+ return err
+}
diff --git a/internal/pkg/openflow/flowMod.go b/internal/pkg/openflow/flowMod.go
new file mode 100644
index 0000000..42f37f2
--- /dev/null
+++ b/internal/pkg/openflow/flowMod.go
@@ -0,0 +1,346 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "context"
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/openflow_13"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+)
+
+var oxmMap = map[string]int32{
+ "in_port": 0,
+ "in_phy_port": 1,
+ "metadata": 2,
+ "eth_dst": 3,
+ "eth_src": 4,
+ "eth_type": 5,
+ "vlan_vid": 6,
+ "vlan_pcp": 7,
+ "ip_dscp": 8,
+ "ip_ecn": 9,
+ "ip_proto": 10,
+ "ipv4_src": 11,
+ "ipv4_dst": 12,
+ "tcp_src": 13,
+ "tcp_dst": 14,
+ "udp_src": 15,
+ "udp_dst": 16,
+ "sctp_src": 17,
+ "sctp_dst": 18,
+ "icmpv4_type": 19,
+ "icmpv4_code": 20,
+ "arp_op": 21,
+ "arp_spa": 22,
+ "arp_tpa": 23,
+ "arp_sha": 24,
+ "arp_tha": 25,
+ "ipv6_src": 26,
+ "ipv6_dst": 27,
+ "ipv6_flabel": 28,
+ "icmpv6_type": 29,
+ "icmpv6_code": 30,
+ "ipv6_nd_target": 31,
+ "ipv6_nd_sll": 32,
+ "ipv6_nd_tll": 33,
+ "mpls_label": 34,
+ "mpls_tc": 35,
+ "mpls_bos": 36,
+ "pbb_isid": 37,
+ "tunnel_id": 38,
+ "ipv6_exthdr": 39,
+}
+
+func (ofc *OFClient) handleFlowAdd(flowAdd *ofp.FlowAdd) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(flowAdd)
+ logger.Debugw("handleFlowAdd called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "params": js})
+ }
+
+ // Construct the match
+ var oxmList []*voltha.OfpOxmField
+ for _, oxmField := range flowAdd.Match.GetOxmList() {
+ name := oxmMap[oxmField.GetOXMName()]
+ val := oxmField.GetOXMValue()
+ field := voltha.OfpOxmOfbField{Type: voltha.OxmOfbFieldTypes(name)}
+ ofpOxmField := voltha.OfpOxmField{
+ OxmClass: ofp.OFPXMCOpenflowBasic,
+ Field: &openflow_13.OfpOxmField_OfbField{OfbField: &field},
+ }
+ switch voltha.OxmOfbFieldTypes(name) {
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT:
+ field.Value = &voltha.OfpOxmOfbField_Port{
+ Port: uint32(val.(ofp.Port)),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT:
+ field.Value = &voltha.OfpOxmOfbField_PhysicalPort{
+ PhysicalPort: val.(uint32),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_METADATA:
+ field.Value = &voltha.OfpOxmOfbField_TableMetadata{
+ TableMetadata: val.(uint64),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE:
+ field.Value = &voltha.OfpOxmOfbField_EthType{
+ EthType: uint32(val.(ofp.EthernetType)),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
+ field.Value = &voltha.OfpOxmOfbField_IpProto{
+ IpProto: uint32(val.(ofp.IpPrototype)),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
+ field.Value = &voltha.OfpOxmOfbField_UdpSrc{
+ UdpSrc: uint32(val.(uint16)),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
+ field.Value = &voltha.OfpOxmOfbField_UdpDst{
+ UdpDst: uint32(val.(uint16)),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
+ field.Value = &voltha.OfpOxmOfbField_VlanVid{
+ VlanVid: uint32((val.(uint16) & 0xfff) | 0x1000),
+ }
+ }
+ oxmList = append(oxmList, &ofpOxmField)
+ }
+
+ // Construct the instructions
+ var instructions []*voltha.OfpInstruction
+ for _, ofpInstruction := range flowAdd.GetInstructions() {
+ instructionType := ofpInstruction.GetType()
+ instruction := voltha.OfpInstruction{Type: uint32(instructionType)}
+ switch instructionType {
+ case ofp.OFPITGotoTable:
+ instruction.Data = &openflow_13.OfpInstruction_GotoTable{
+ GotoTable: &openflow_13.OfpInstructionGotoTable{
+ TableId: uint32(ofpInstruction.(ofp.IInstructionGotoTable).GetTableId()),
+ },
+ }
+ case ofp.OFPITWriteMetadata:
+ instruction.Data = &openflow_13.OfpInstruction_WriteMetadata{
+ WriteMetadata: &openflow_13.OfpInstructionWriteMetadata{
+ Metadata: ofpInstruction.(ofp.IInstructionWriteMetadata).GetMetadata(),
+ MetadataMask: ofpInstruction.(ofp.IInstructionWriteMetadata).GetMetadataMask(),
+ },
+ }
+ case ofp.OFPITWriteActions:
+ var ofpActions []*openflow_13.OfpAction
+ for _, action := range ofpInstruction.(ofp.IInstructionWriteActions).GetActions() {
+ ofpActions = append(ofpActions, extractAction(action))
+ }
+ instruction.Data = &openflow_13.OfpInstruction_Actions{
+ Actions: &openflow_13.OfpInstructionActions{
+ Actions: ofpActions,
+ },
+ }
+ case ofp.OFPITApplyActions:
+ var ofpActions []*openflow_13.OfpAction
+ for _, action := range ofpInstruction.(ofp.IInstructionApplyActions).GetActions() {
+ ofpActions = append(ofpActions, extractAction(action))
+ }
+ instruction.Data = &openflow_13.OfpInstruction_Actions{
+ Actions: &openflow_13.OfpInstructionActions{
+ Actions: ofpActions,
+ },
+ }
+ case ofp.OFPITMeter:
+ instruction.Data = &openflow_13.OfpInstruction_Meter{
+ Meter: &openflow_13.OfpInstructionMeter{
+ MeterId: ofpInstruction.(ofp.IInstructionMeter).GetMeterId(),
+ },
+ }
+ }
+ instructions = append(instructions, &instruction)
+ }
+
+ // Construct the request
+ flowUpdate := openflow_13.FlowTableUpdate{
+ Id: ofc.DeviceID,
+ FlowMod: &voltha.OfpFlowMod{
+ Cookie: flowAdd.Cookie,
+ CookieMask: flowAdd.CookieMask,
+ TableId: uint32(flowAdd.TableId),
+ Command: voltha.OfpFlowModCommand_OFPFC_ADD,
+ IdleTimeout: uint32(flowAdd.IdleTimeout),
+ HardTimeout: uint32(flowAdd.HardTimeout),
+ Priority: uint32(flowAdd.Priority),
+ BufferId: flowAdd.BufferId,
+ OutPort: uint32(flowAdd.OutPort),
+ OutGroup: uint32(flowAdd.OutGroup),
+ Flags: uint32(flowAdd.Flags),
+ Match: &voltha.OfpMatch{
+ Type: voltha.OfpMatchType(flowAdd.Match.GetType()),
+ OxmFields: oxmList,
+ },
+
+ Instructions: instructions,
+ },
+ }
+ if logger.V(log.DebugLevel) {
+ flowUpdateJs, _ := json.Marshal(flowUpdate)
+ logger.Debugf("FlowUpdate being sent to Voltha",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "flow-mod-request": flowUpdateJs})
+ }
+ if _, err := ofc.VolthaClient.UpdateLogicalDeviceFlowTable(context.Background(), &flowUpdate); err != nil {
+ logger.Errorw("Error calling FlowUpdate ",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ }
+}
+
+func (ofc *OFClient) handleFlowMod(flowMod *ofp.FlowMod) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(flowMod)
+ logger.Debugw("handleMod called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "flow-mod": js})
+ }
+ logger.Errorw("handleFlowMod not implemented",
+ log.Fields{"device-id": ofc.DeviceID})
+}
+
+func (ofc *OFClient) handleFlowModStrict(flowModStrict *ofp.FlowModifyStrict) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(flowModStrict)
+ logger.Debugw("handleFlowModStrict called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "flow-mod-strict": js})
+ }
+ logger.Error("handleFlowModStrict not implemented",
+ log.Fields{"device-id": ofc.DeviceID})
+}
+
+func (ofc *OFClient) handleFlowDelete(flowDelete *ofp.FlowDelete) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(flowDelete)
+ logger.Debugw("handleFlowDelete called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "flow-delete": js})
+ }
+ logger.Error("handleFlowDelete not implemented",
+ log.Fields{"device-id": ofc.DeviceID})
+
+}
+
+func (ofc *OFClient) handleFlowDeleteStrict(flowDeleteStrict *ofp.FlowDeleteStrict) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(flowDeleteStrict)
+ logger.Debugw("handleFlowAdd called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "flow-delete-strict": js})
+ }
+
+ // Construct match
+ var oxmList []*voltha.OfpOxmField
+ for _, oxmField := range flowDeleteStrict.Match.GetOxmList() {
+ name := oxmMap[oxmField.GetOXMName()]
+ val := oxmField.GetOXMValue()
+ var ofpOxmField voltha.OfpOxmField
+ ofpOxmField.OxmClass = ofp.OFPXMCOpenflowBasic
+ var field voltha.OfpOxmOfbField
+ field.Type = voltha.OxmOfbFieldTypes(name)
+
+ var x openflow_13.OfpOxmField_OfbField
+ x.OfbField = &field
+ ofpOxmField.Field = &x
+
+ switch voltha.OxmOfbFieldTypes(name) {
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT:
+ field.Value = &voltha.OfpOxmOfbField_Port{
+ Port: uint32(val.(ofp.Port)),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT:
+ field.Value = &voltha.OfpOxmOfbField_PhysicalPort{
+ PhysicalPort: val.(uint32),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_METADATA:
+ field.Value = &voltha.OfpOxmOfbField_TableMetadata{
+ TableMetadata: val.(uint64),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE:
+ field.Value = &voltha.OfpOxmOfbField_EthType{
+ EthType: uint32(val.(ofp.EthernetType)),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
+ field.Value = &voltha.OfpOxmOfbField_IpProto{
+ IpProto: uint32(val.(ofp.IpPrototype)),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
+ field.Value = &voltha.OfpOxmOfbField_UdpSrc{
+ UdpSrc: uint32(val.(uint16)),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
+ field.Value = &voltha.OfpOxmOfbField_UdpDst{
+ UdpDst: uint32(val.(uint16)),
+ }
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
+ field.Value = &voltha.OfpOxmOfbField_VlanVid{
+ VlanVid: uint32(val.(uint16)),
+ }
+ }
+ oxmList = append(oxmList, &ofpOxmField)
+ }
+
+ // Construct request
+ flowUpdate := openflow_13.FlowTableUpdate{
+ Id: ofc.DeviceID,
+ FlowMod: &voltha.OfpFlowMod{
+ Cookie: flowDeleteStrict.Cookie,
+ CookieMask: flowDeleteStrict.CookieMask,
+ TableId: uint32(flowDeleteStrict.TableId),
+ Command: voltha.OfpFlowModCommand_OFPFC_DELETE_STRICT,
+ IdleTimeout: uint32(flowDeleteStrict.IdleTimeout),
+ HardTimeout: uint32(flowDeleteStrict.HardTimeout),
+ Priority: uint32(flowDeleteStrict.Priority),
+ BufferId: flowDeleteStrict.BufferId,
+ OutPort: uint32(flowDeleteStrict.OutPort),
+ OutGroup: uint32(flowDeleteStrict.OutGroup),
+ Flags: uint32(flowDeleteStrict.Flags),
+ Match: &voltha.OfpMatch{
+ Type: voltha.OfpMatchType(flowDeleteStrict.Match.GetType()),
+ OxmFields: oxmList,
+ },
+ },
+ }
+
+ if logger.V(log.DebugLevel) {
+ flowUpdateJs, _ := json.Marshal(flowUpdate)
+ logger.Debugf("FlowUpdate being sent to Voltha",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "flow-update": flowUpdateJs})
+ }
+ if _, err := ofc.VolthaClient.UpdateLogicalDeviceFlowTable(context.Background(), &flowUpdate); err != nil {
+ logger.Errorw("Error calling FlowUpdate ",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ }
+}
diff --git a/internal/pkg/openflow/getConfig.go b/internal/pkg/openflow/getConfig.go
new file mode 100644
index 0000000..6d373e9
--- /dev/null
+++ b/internal/pkg/openflow/getConfig.go
@@ -0,0 +1,45 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+func (ofc *OFClient) handleGetConfigRequest(request *ofp.GetConfigRequest) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(request)
+ logger.Debugw("handleGetConfigRequest called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": js})
+ }
+ reply := ofp.NewGetConfigReply()
+ reply.SetVersion(4)
+ reply.SetXid(request.GetXid())
+ reply.SetMissSendLen(ofp.OFPCMLNoBuffer)
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(reply)
+ logger.Debugw("handleGetConfigRequest reply",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "reply": js})
+ }
+ ofc.SendMessage(reply)
+}
diff --git a/internal/pkg/openflow/meter.go b/internal/pkg/openflow/meter.go
new file mode 100644
index 0000000..d4843fa
--- /dev/null
+++ b/internal/pkg/openflow/meter.go
@@ -0,0 +1,94 @@
+/*
+Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package openflow
+
+import (
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/openflow_13"
+ "golang.org/x/net/context"
+)
+
+func (ofc *OFClient) handleMeterModRequest(request *ofp.MeterMod) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(request)
+ logger.Debugw("handleMeterModRequest called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": js})
+ }
+
+ meterModUpdate := openflow_13.MeterModUpdate{Id: ofc.DeviceID}
+ meterMod := openflow_13.OfpMeterMod{
+ MeterId: request.MeterId,
+ Flags: uint32(request.Flags),
+ Command: openflow_13.OfpMeterModCommand(request.Command),
+ }
+ var bands []*openflow_13.OfpMeterBandHeader
+ for _, ofpBand := range request.GetMeters() {
+ var band openflow_13.OfpMeterBandHeader
+ switch ofpBand.GetType() {
+ case ofp.OFPMBTDrop:
+ ofpDrop := ofpBand.(ofp.IMeterBandDrop)
+ band.Type = openflow_13.OfpMeterBandType_OFPMBT_DROP
+ band.Rate = ofpDrop.GetRate()
+ band.BurstSize = ofpDrop.GetBurstSize()
+ case ofp.OFPMBTDSCPRemark:
+ ofpDscpRemark := ofpBand.(ofp.IMeterBandDscpRemark)
+ var dscpRemark openflow_13.OfpMeterBandDscpRemark
+ band.Type = openflow_13.OfpMeterBandType_OFPMBT_DSCP_REMARK
+ band.BurstSize = ofpDscpRemark.GetBurstSize()
+ band.Rate = ofpDscpRemark.GetRate()
+ dscpRemark.PrecLevel = uint32(ofpDscpRemark.GetPrecLevel())
+ /*
+ var meterBandHeaderDscp openflow_13.OfpMeterBandHeader_DscpRemark
+ meterBandHeaderDscp.DscpRemark = &dscpRemark
+ band.Data = &meterBandHeaderDscp
+
+ */
+ case ofp.OFPMBTExperimenter:
+ ofpExperimenter := ofpBand.(ofp.IMeterBandExperimenter)
+ var experimenter openflow_13.OfpMeterBandExperimenter
+ experimenter.Experimenter = ofpExperimenter.GetExperimenter()
+ band.Type = openflow_13.OfpMeterBandType_OFPMBT_EXPERIMENTER
+ band.BurstSize = ofpExperimenter.GetBurstSize()
+ band.Rate = ofpExperimenter.GetRate()
+ /*
+ var meterBandHeaderExperimenter openflow_13.OfpMeterBandHeader_Experimenter
+ meterBandHeaderExperimenter.Experimenter = &experimenter
+ band.Data = &meterBandHeaderExperimenter
+
+ */
+ }
+ bands = append(bands, &band)
+ }
+ meterMod.Bands = bands
+ meterModUpdate.MeterMod = &meterMod
+ if logger.V(log.DebugLevel) {
+ meterModJS, _ := json.Marshal(meterModUpdate)
+ logger.Debugw("handleMeterModUpdate sending request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "meter-mod-request": meterModJS})
+ }
+ if _, err := ofc.VolthaClient.UpdateLogicalDeviceMeterTable(context.Background(), &meterModUpdate); err != nil {
+ logger.Errorw("Error calling UpdateLogicalDeviceMeterTable",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "error": err})
+ }
+}
diff --git a/internal/pkg/openflow/packet.go b/internal/pkg/openflow/packet.go
new file mode 100644
index 0000000..dc26594
--- /dev/null
+++ b/internal/pkg/openflow/packet.go
@@ -0,0 +1,62 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+)
+
+func (ofc *OFClient) handlePacketOut(packetOut *ofp.PacketOut) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(packetOut)
+ logger.Debugw("handlePacketOut called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "packet-out": js})
+ }
+
+ // Collection actions
+ var actions []*voltha.OfpAction
+ for _, action := range packetOut.GetActions() {
+ actions = append(actions, extractAction(action))
+ }
+
+ // Build packet out
+ pbPacketOut := voltha.PacketOut{
+ Id: ofc.DeviceID,
+ PacketOut: &voltha.OfpPacketOut{
+ BufferId: packetOut.GetBufferId(),
+ InPort: uint32(packetOut.GetInPort()),
+ Actions: actions,
+ Data: packetOut.GetData(),
+ },
+ }
+
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(pbPacketOut)
+ logger.Debugw("handlePacketOut sending",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "packet-out": js})
+ }
+
+ // Queue it
+ ofc.PacketOutChannel <- &pbPacketOut
+}
diff --git a/internal/pkg/openflow/parseGrpcReturn.go b/internal/pkg/openflow/parseGrpcReturn.go
new file mode 100644
index 0000000..f4177d1
--- /dev/null
+++ b/internal/pkg/openflow/parseGrpcReturn.go
@@ -0,0 +1,216 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package openflow
+
+import (
+ "encoding/json"
+ "github.com/donNewtonAlpha/goloxi"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/openflow_13"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+)
+
+func parseOxm(ofbField *openflow_13.OfpOxmOfbField, DeviceID string) (goloxi.IOxm, uint16) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(ofbField)
+ logger.Debugw("parseOxm called",
+ log.Fields{
+ "device-id": DeviceID,
+ "ofbField": js})
+ }
+
+ switch ofbField.Type {
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT:
+ ofpInPort := ofp.NewOxmInPort()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_Port)
+ ofpInPort.Value = ofp.Port(val.Port)
+ return ofpInPort, 4
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE:
+ ofpEthType := ofp.NewOxmEthType()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_EthType)
+ ofpEthType.Value = ofp.EthernetType(val.EthType)
+ return ofpEthType, 2
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT:
+ ofpInPhyPort := ofp.NewOxmInPhyPort()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_PhysicalPort)
+ ofpInPhyPort.Value = ofp.Port(val.PhysicalPort)
+ return ofpInPhyPort, 4
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
+ ofpIpProto := ofp.NewOxmIpProto()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_IpProto)
+ ofpIpProto.Value = ofp.IpPrototype(val.IpProto)
+ return ofpIpProto, 1
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
+ ofpUdpSrc := ofp.NewOxmUdpSrc()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpSrc)
+ ofpUdpSrc.Value = uint16(val.UdpSrc)
+ return ofpUdpSrc, 2
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
+ ofpUdpDst := ofp.NewOxmUdpDst()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpDst)
+ ofpUdpDst.Value = uint16(val.UdpDst)
+ return ofpUdpDst, 2
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
+ ofpVlanVid := ofp.NewOxmVlanVid()
+ val := ofbField.GetValue()
+ if val != nil {
+ vlanId := val.(*openflow_13.OfpOxmOfbField_VlanVid)
+ ofpVlanVid.Value = uint16(vlanId.VlanVid) | 0x1000
+ } else {
+ ofpVlanVid.Value = uint16(0)
+ }
+ return ofpVlanVid, 2
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_METADATA:
+ ofpMetadata := ofp.NewOxmMetadata()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_TableMetadata)
+ ofpMetadata.Value = val.TableMetadata
+ return ofpMetadata, 8
+ default:
+ if logger.V(log.WarnLevel) {
+ js, _ := json.Marshal(ofbField)
+ logger.Warnw("ParseOXM Unhandled OxmField",
+ log.Fields{
+ "device-id": DeviceID,
+ "OfbField": js})
+ }
+ }
+ return nil, 0
+}
+
+func parseInstructions(ofpInstruction *openflow_13.OfpInstruction, DeviceID string) (ofp.IInstruction, uint16) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(ofpInstruction)
+ logger.Debugw("parseInstructions called",
+ log.Fields{
+ "device-id": DeviceID,
+ "Instruction": js})
+ }
+ instType := ofpInstruction.Type
+ data := ofpInstruction.GetData()
+ switch instType {
+ case ofp.OFPITWriteMetadata:
+ instruction := ofp.NewInstructionWriteMetadata()
+ instruction.Len = 24
+ metadata := data.(*openflow_13.OfpInstruction_WriteMetadata).WriteMetadata
+ instruction.Metadata = uint64(metadata.Metadata)
+ return instruction, 24
+ case ofp.OFPITMeter:
+ instruction := ofp.NewInstructionMeter()
+ instruction.Len = 8
+ meter := data.(*openflow_13.OfpInstruction_Meter).Meter
+ instruction.MeterId = meter.MeterId
+ return instruction, 8
+ case ofp.OFPITGotoTable:
+ instruction := ofp.NewInstructionGotoTable()
+ instruction.Len = 8
+ gotoTable := data.(*openflow_13.OfpInstruction_GotoTable).GotoTable
+ instruction.TableId = uint8(gotoTable.TableId)
+ return instruction, 8
+ case ofp.OFPITApplyActions:
+ instruction := ofp.NewInstructionApplyActions()
+ var instructionSize uint16
+ instructionSize = 8
+ //ofpActions := ofpInstruction.GetActions().Actions
+ var actions []goloxi.IAction
+ for _, ofpAction := range ofpInstruction.GetActions().Actions {
+ action, actionSize := parseAction(ofpAction, DeviceID)
+ actions = append(actions, action)
+ instructionSize += actionSize
+
+ }
+ instruction.Actions = actions
+ instruction.SetLen(instructionSize)
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(instruction)
+ logger.Debugw("parseInstructions returning",
+ log.Fields{
+ "device-id": DeviceID,
+ "size": instructionSize,
+ "parsed-instruction": js})
+ }
+ return instruction, instructionSize
+ }
+ //shouldn't have reached here :<
+ return nil, 0
+}
+
+func parseAction(ofpAction *openflow_13.OfpAction, DeviceID string) (goloxi.IAction, uint16) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(ofpAction)
+ logger.Debugw("parseAction called",
+ log.Fields{
+ "device-id": DeviceID,
+ "action": js})
+ }
+ switch ofpAction.Type {
+ case openflow_13.OfpActionType_OFPAT_OUTPUT:
+ ofpOutputAction := ofpAction.GetOutput()
+ outputAction := ofp.NewActionOutput()
+ outputAction.Port = ofp.Port(ofpOutputAction.Port)
+ outputAction.MaxLen = uint16(ofpOutputAction.MaxLen)
+ outputAction.Len = 16
+ return outputAction, 16
+ case openflow_13.OfpActionType_OFPAT_PUSH_VLAN:
+ ofpPushVlanAction := ofp.NewActionPushVlan()
+ ofpPushVlanAction.Ethertype = uint16(ofpAction.GetPush().Ethertype)
+ ofpPushVlanAction.Len = 8
+ return ofpPushVlanAction, 8
+ case openflow_13.OfpActionType_OFPAT_POP_VLAN:
+ ofpPopVlanAction := ofp.NewActionPopVlan()
+ ofpPopVlanAction.Len = 8
+ return ofpPopVlanAction, 8
+ case openflow_13.OfpActionType_OFPAT_SET_FIELD:
+ ofpActionSetField := ofpAction.GetSetField()
+ setFieldAction := ofp.NewActionSetField()
+
+ iOxm, _ := parseOxm(ofpActionSetField.GetField().GetOfbField(), DeviceID)
+ setFieldAction.Field = iOxm
+ setFieldAction.Len = 16
+ return setFieldAction, 16
+ default:
+ if logger.V(log.WarnLevel) {
+ js, _ := json.Marshal(ofpAction)
+ logger.Warnw("parseAction unknow action",
+ log.Fields{
+ "device-id": DeviceID,
+ "action": js})
+ }
+ }
+ return nil, 0
+}
+
+func parsePortStats(port *voltha.LogicalPort) *ofp.PortStatsEntry {
+ stats := port.OfpPortStats
+ port.OfpPort.GetPortNo()
+ var entry ofp.PortStatsEntry
+ entry.SetPortNo(ofp.Port(port.OfpPort.GetPortNo()))
+ entry.SetRxPackets(stats.GetRxPackets())
+ entry.SetTxPackets(stats.GetTxPackets())
+ entry.SetRxBytes(stats.GetRxBytes())
+ entry.SetTxBytes(stats.GetTxBytes())
+ entry.SetRxDropped(stats.GetRxDropped())
+ entry.SetTxDropped(stats.GetTxDropped())
+ entry.SetRxErrors(stats.GetRxErrors())
+ entry.SetTxErrors(stats.GetTxErrors())
+ entry.SetRxFrameErr(stats.GetRxFrameErr())
+ entry.SetRxOverErr(stats.GetRxOverErr())
+ entry.SetRxCrcErr(stats.GetRxCrcErr())
+ entry.SetCollisions(stats.GetCollisions())
+ entry.SetDurationSec(stats.GetDurationSec())
+ entry.SetDurationNsec(stats.GetDurationNsec())
+ return &entry
+}
diff --git a/internal/pkg/openflow/role.go b/internal/pkg/openflow/role.go
new file mode 100644
index 0000000..3b7bf2f
--- /dev/null
+++ b/internal/pkg/openflow/role.go
@@ -0,0 +1,39 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+func (ofc *OFClient) handleRoleRequest(request *ofp.RoleRequest) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(request)
+ logger.Debugw("handleRoleRequest called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": js})
+ }
+ reply := ofp.NewRoleReply()
+ reply.SetXid(request.GetXid())
+ reply.SetVersion(request.GetVersion())
+ reply.SetRole(request.GetRole())
+ reply.SetGenerationId(request.GetGenerationId())
+ ofc.SendMessage(reply)
+}
diff --git a/internal/pkg/openflow/setConfig.go b/internal/pkg/openflow/setConfig.go
new file mode 100644
index 0000000..d1c52ac
--- /dev/null
+++ b/internal/pkg/openflow/setConfig.go
@@ -0,0 +1,34 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+)
+
+func (ofc *OFClient) handleSetConfig(request *ofp.SetConfig) {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(request)
+ logger.Debugw("handleSetConfig called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": js})
+
+ }
+}
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
new file mode 100644
index 0000000..68d2625
--- /dev/null
+++ b/internal/pkg/openflow/stats.go
@@ -0,0 +1,619 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "context"
+ "encoding/json"
+ "github.com/donNewtonAlpha/goloxi"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/common"
+ "github.com/opencord/voltha-protos/v2/go/openflow_13"
+ "net"
+ "unsafe"
+)
+
+func (ofc *OFClient) handleStatsRequest(request ofp.IHeader, statType uint16) error {
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(request)
+ logger.Debugw("handleStatsRequest called",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "stat-type": statType,
+ "request": js})
+ }
+
+ switch statType {
+ case ofp.OFPSTDesc:
+ statsReq := request.(*ofp.DescStatsRequest)
+ response, err := ofc.handleDescStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+ case ofp.OFPSTFlow:
+ statsReq := request.(*ofp.FlowStatsRequest)
+ response, err := ofc.handleFlowStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ response.Length = uint16(unsafe.Sizeof(*response))
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+
+ case ofp.OFPSTAggregate:
+ statsReq := request.(*ofp.AggregateStatsRequest)
+ response, err := ofc.handleAggregateStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+ case ofp.OFPSTTable:
+ statsReq := request.(*ofp.TableStatsRequest)
+ response, e := ofc.handleTableStatsRequest(statsReq)
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ if e != nil {
+ return e
+ }
+ return ofc.SendMessage(response)
+ case ofp.OFPSTPort:
+ statsReq := request.(*ofp.PortStatsRequest)
+ response, err := ofc.handlePortStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+ case ofp.OFPSTQueue:
+ statsReq := request.(*ofp.QueueStatsRequest)
+ response, err := ofc.handleQueueStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+ case ofp.OFPSTGroup:
+ statsReq := request.(*ofp.GroupStatsRequest)
+ response, err := ofc.handleGroupStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ ofc.SendMessage(response)
+ case ofp.OFPSTGroupDesc:
+ statsReq := request.(*ofp.GroupDescStatsRequest)
+ response, err := ofc.handleGroupStatsDescRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+
+ case ofp.OFPSTGroupFeatures:
+ statsReq := request.(*ofp.GroupFeaturesStatsRequest)
+ response, err := ofc.handleGroupFeatureStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+ case ofp.OFPSTMeter:
+ statsReq := request.(*ofp.MeterStatsRequest)
+ response, err := ofc.handleMeterStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+ case ofp.OFPSTMeterConfig:
+ statsReq := request.(*ofp.MeterConfigStatsRequest)
+ response, err := ofc.handleMeterConfigStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+ case ofp.OFPSTMeterFeatures:
+ statsReq := request.(*ofp.MeterFeaturesStatsRequest)
+ response, err := ofc.handleMeterFeatureStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+ case ofp.OFPSTTableFeatures:
+ statsReq := request.(*ofp.TableFeaturesStatsRequest)
+ response, err := ofc.handleTableFeaturesStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+ case ofp.OFPSTPortDesc:
+ statsReq := request.(*ofp.PortDescStatsRequest)
+ response, err := ofc.handlePortDescStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+
+ case ofp.OFPSTExperimenter:
+ statsReq := request.(*ofp.ExperimenterStatsRequest)
+ response, err := ofc.handleExperimenterStatsRequest(statsReq)
+ if err != nil {
+ return err
+ }
+ if logger.V(log.DebugLevel) {
+ reqJs, _ := json.Marshal(statsReq)
+ resJs, _ := json.Marshal(response)
+ logger.Debugw("handle-stats-request",
+ log.Fields{
+ "device-id": ofc.DeviceID,
+ "request": reqJs,
+ "response": resJs})
+ }
+ return ofc.SendMessage(response)
+ }
+ return nil
+}
+
+func (ofc *OFClient) handleDescStatsRequest(request *ofp.DescStatsRequest) (*ofp.DescStatsReply, error) {
+ response := ofp.NewDescStatsReply()
+ response.SetXid(request.GetXid())
+ response.SetVersion(request.GetVersion())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+
+ resp, err := ofc.VolthaClient.GetLogicalDevice(context.Background(),
+ &common.ID{Id: ofc.DeviceID})
+ if err != nil {
+ return nil, err
+ }
+ desc := resp.GetDesc()
+
+ response.SetMfrDesc(PadString(desc.GetMfrDesc(), 256))
+ response.SetHwDesc(PadString(desc.GetHwDesc(), 256))
+ response.SetSwDesc(PadString(desc.GetSwDesc(), 256))
+ response.SetSerialNum(PadString(desc.GetSerialNum(), 32))
+ response.SetDpDesc(PadString(desc.GetDpDesc(), 256))
+ return response, nil
+}
+
+func (ofc *OFClient) handleFlowStatsRequest(request *ofp.FlowStatsRequest) (*ofp.FlowStatsReply, error) {
+ response := ofp.NewFlowStatsReply()
+ response.SetXid(request.GetXid())
+ response.SetVersion(4)
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ resp, err := ofc.VolthaClient.ListLogicalDeviceFlows(context.Background(),
+ &common.ID{Id: ofc.DeviceID})
+ if err != nil {
+ return nil, err
+ }
+ var flow []*ofp.FlowStatsEntry
+ for _, item := range resp.GetItems() {
+ entry := ofp.NewFlowStatsEntry()
+ entry.SetTableId(uint8(item.GetTableId()))
+ entry.SetDurationSec(item.GetDurationSec())
+ entry.SetDurationNsec(item.GetDurationNsec())
+ entry.SetPriority(uint16(item.GetPriority()))
+ entry.SetIdleTimeout(uint16(item.GetIdleTimeout()))
+ entry.SetHardTimeout(uint16(item.GetHardTimeout()))
+ entry.SetFlags(ofp.FlowModFlags(item.GetFlags()))
+ entry.SetCookie(item.GetCookie())
+ entry.SetPacketCount(item.GetPacketCount())
+ entry.SetByteCount(item.GetByteCount())
+ entrySize := uint16(48)
+ match := ofp.NewMatchV3()
+ pbMatch := item.GetMatch()
+ match.SetType(uint16(pbMatch.GetType()))
+ size := uint16(4)
+ var fields []goloxi.IOxm
+ for _, oxmField := range pbMatch.GetOxmFields() {
+ field := oxmField.GetField()
+ ofbField := field.(*openflow_13.OfpOxmField_OfbField).OfbField
+ iOxm, oxmSize := parseOxm(ofbField, ofc.DeviceID)
+ fields = append(fields, iOxm)
+ if oxmSize > 0 {
+ size += 4 //header for oxm
+ }
+ size += oxmSize
+ }
+
+ match.OxmList = fields
+ match.Length = uint16(size)
+ //account for 8 byte alignment
+ if size%8 != 0 {
+ size = ((size / 8) + 1) * 8
+ }
+ entrySize += size
+ entry.SetMatch(*match)
+ var instructions []ofp.IInstruction
+ for _, ofpInstruction := range item.Instructions {
+ instruction, size := parseInstructions(ofpInstruction, ofc.DeviceID)
+ instructions = append(instructions, instruction)
+ entrySize += size
+ }
+ entry.Instructions = instructions
+ entry.Length = entrySize
+ entrySize = 0
+ flow = append(flow, entry)
+ }
+ response.SetEntries(flow)
+ return response, nil
+}
+
+func (ofc *OFClient) handleAggregateStatsRequest(request *ofp.AggregateStatsRequest) (*ofp.AggregateStatsReply, error) {
+ response := ofp.NewAggregateStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ response.SetFlowCount(0)
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+
+func (ofc *OFClient) handleGroupStatsRequest(request *ofp.GroupStatsRequest) (*ofp.GroupStatsReply, error) {
+ response := ofp.NewGroupStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ reply, err := ofc.VolthaClient.ListLogicalDeviceFlowGroups(context.Background(),
+ &common.ID{Id: ofc.DeviceID})
+ if err != nil {
+ return nil, err
+ }
+
+ var groupStatsEntries []*ofp.GroupStatsEntry
+ for _, item := range reply.GetItems() {
+ stats := item.GetStats()
+ var entry ofp.GroupStatsEntry
+ entry.SetByteCount(stats.GetByteCount())
+ entry.SetPacketCount(stats.GetPacketCount())
+ entry.SetDurationNsec(stats.GetDurationNsec())
+ entry.SetDurationSec(stats.GetDurationSec())
+ entry.SetRefCount(stats.GetRefCount())
+ entry.SetGroupId(stats.GetGroupId())
+ var bucketStatsList []*ofp.BucketCounter
+ for _, bucketStat := range stats.GetBucketStats() {
+ bucketCounter := ofp.BucketCounter{}
+ bucketCounter.SetPacketCount(bucketStat.GetPacketCount())
+ bucketCounter.SetByteCount(bucketStat.GetByteCount())
+ bucketStatsList = append(bucketStatsList, &bucketCounter)
+ }
+ entry.SetBucketStats(bucketStatsList)
+ groupStatsEntries = append(groupStatsEntries, &entry)
+ }
+ response.SetEntries(groupStatsEntries)
+ return response, nil
+}
+
+func (ofc *OFClient) handleGroupStatsDescRequest(request *ofp.GroupDescStatsRequest) (*ofp.GroupDescStatsReply, error) {
+ response := ofp.NewGroupDescStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ reply, err := ofc.VolthaClient.ListLogicalDeviceFlowGroups(context.Background(),
+ &common.ID{Id: ofc.DeviceID})
+ if err != nil {
+ return nil, err
+ }
+ var groupDescStatsEntries []*ofp.GroupDescStatsEntry
+ for _, item := range reply.GetItems() {
+ stats := item.GetStats()
+ var groupDesc ofp.GroupDescStatsEntry
+ groupDesc.SetGroupId(stats.GetGroupId())
+ /*
+ buckets := item.g
+ var bucketList []*ofp.Bucket
+ for j:=0;j<len(buckets);j++{
+
+ }
+
+ groupDesc.SetBuckets(bucketList)
+ */
+ groupDescStatsEntries = append(groupDescStatsEntries, &groupDesc)
+ }
+ response.SetEntries(groupDescStatsEntries)
+ return response, nil
+}
+
+func (ofc *OFClient) handleGroupFeatureStatsRequest(request *ofp.GroupFeaturesStatsRequest) (*ofp.GroupFeaturesStatsReply, error) {
+ response := ofp.NewGroupFeaturesStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+
+func (ofc *OFClient) handleMeterStatsRequest(request *ofp.MeterStatsRequest) (*ofp.MeterStatsReply, error) {
+ response := ofp.NewMeterStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ resp, err := ofc.VolthaClient.ListLogicalDeviceMeters(context.Background(),
+ &common.ID{Id: ofc.DeviceID})
+ if err != nil {
+ return nil, err
+ }
+ size := uint16(40)
+ var meterStats []*ofp.MeterStats
+ for _, item := range resp.Items {
+ meterStat := ofp.NewMeterStats()
+ stats := item.Stats
+ meterStat.DurationNsec = stats.DurationNsec
+ meterStat.DurationSec = stats.DurationSec
+ meterStat.ByteInCount = stats.ByteInCount
+ meterStat.FlowCount = stats.FlowCount
+ meterStat.MeterId = stats.MeterId
+ var bandStats []*ofp.MeterBandStats
+ for _, bStat := range stats.BandStats {
+ bandStat := ofp.NewMeterBandStats()
+ bandStat.ByteBandCount = bStat.ByteBandCount
+ bandStat.PacketBandCount = bStat.PacketBandCount
+ bandStats = append(bandStats, bandStat)
+ size += 16
+ }
+ meterStat.SetBandStats(bandStats)
+ meterStat.Len = size
+ meterStats = append(meterStats, meterStat)
+ }
+ response.SetEntries(meterStats)
+ return response, nil
+}
+
+func (ofc *OFClient) handleMeterConfigStatsRequest(request *ofp.MeterConfigStatsRequest) (*ofp.MeterConfigStatsReply, error) {
+ response := ofp.NewMeterConfigStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+
+func (ofc *OFClient) handleTableFeaturesStatsRequest(request *ofp.TableFeaturesStatsRequest) (*ofp.TableFeaturesStatsReply, error) {
+ response := ofp.NewTableFeaturesStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+
+func (ofc *OFClient) handleTableStatsRequest(request *ofp.TableStatsRequest) (*ofp.TableStatsReply, error) {
+ var response = ofp.NewTableStatsReply()
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ return response, nil
+}
+
+func (ofc *OFClient) handleQueueStatsRequest(request *ofp.QueueStatsRequest) (*ofp.QueueStatsReply, error) {
+ response := ofp.NewQueueStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
+
+func (ofc *OFClient) handlePortStatsRequest(request *ofp.PortStatsRequest) (*ofp.PortStatsReply, error) {
+ response := ofp.NewPortStatsReply()
+ response.SetXid(request.GetXid())
+ response.SetVersion(request.GetVersion())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ reply, err := ofc.VolthaClient.ListLogicalDevicePorts(context.Background(),
+ &common.ID{Id: ofc.DeviceID})
+ if err != nil {
+ return nil, err
+ }
+ var entries []*ofp.PortStatsEntry
+ if request.GetPortNo() == 0xffffffff { //all ports
+ for _, port := range reply.GetItems() {
+ entries = append(entries, parsePortStats(port))
+ }
+ } else { //find right port that is requested
+ for _, port := range reply.GetItems() {
+ if port.GetOfpPortStats().GetPortNo() == uint32(request.GetPortNo()) {
+ entries = append(entries, parsePortStats(port))
+ }
+ }
+ }
+ response.SetEntries(entries)
+ return response, nil
+}
+
+func (ofc *OFClient) handlePortDescStatsRequest(request *ofp.PortDescStatsRequest) (*ofp.PortDescStatsReply, error) {
+ response := ofp.NewPortDescStatsReply()
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ logicalDevice, err := ofc.VolthaClient.GetLogicalDevice(context.Background(),
+ &common.ID{Id: ofc.DeviceID})
+ if err != nil {
+ return nil, err
+ }
+ var entries []*ofp.PortDesc
+ for _, port := range logicalDevice.GetPorts() {
+ ofpPort := port.GetOfpPort()
+ var entry ofp.PortDesc
+ entry.SetPortNo(ofp.Port(ofpPort.GetPortNo()))
+
+ var octets []byte
+ for _, val := range ofpPort.GetHwAddr() {
+ octets = append(octets, byte(val))
+ }
+ hwAddr := net.HardwareAddr(octets)
+ entry.SetHwAddr(hwAddr)
+ entry.SetName(PadString(ofpPort.GetName(), 16))
+ entry.SetConfig(ofp.PortConfig(ofpPort.GetConfig()))
+ entry.SetState(ofp.PortState(ofpPort.GetState()))
+ entry.SetCurr(ofp.PortFeatures(ofpPort.GetCurr()))
+ entry.SetAdvertised(ofp.PortFeatures(ofpPort.GetAdvertised()))
+ entry.SetSupported(ofp.PortFeatures(ofpPort.GetSupported()))
+ entry.SetPeer(ofp.PortFeatures(ofpPort.GetPeer()))
+ entry.SetCurrSpeed(ofpPort.GetCurrSpeed())
+ entry.SetMaxSpeed(ofpPort.GetMaxSpeed())
+
+ entries = append(entries, &entry)
+ }
+
+ response.SetEntries(entries)
+ //TODO call voltha and get port descriptions etc
+ return response, nil
+
+}
+
+func (ofc *OFClient) handleMeterFeatureStatsRequest(request *ofp.MeterFeaturesStatsRequest) (*ofp.MeterFeaturesStatsReply, error) {
+ response := ofp.NewMeterFeaturesStatsReply()
+ response.SetXid(request.GetXid())
+ response.SetVersion(request.GetVersion())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ meterFeatures := ofp.NewMeterFeatures()
+ meterFeatures.Capabilities = ofp.OFPMFKbps
+ meterFeatures.BandTypes = ofp.OFPMBTDrop
+ meterFeatures.MaxMeter = 0xffffffff
+ meterFeatures.MaxBands = 0xff
+ meterFeatures.MaxColor = 0xff
+ response.Features = *meterFeatures
+ return response, nil
+}
+
+func (ofc *OFClient) handleExperimenterStatsRequest(request *ofp.ExperimenterStatsRequest) (*ofp.ExperimenterStatsReply, error) {
+ response := ofp.NewExperimenterStatsReply(request.GetExperimenter())
+ response.SetVersion(request.GetVersion())
+ response.SetXid(request.GetXid())
+ response.SetFlags(ofp.StatsReplyFlags(request.GetFlags()))
+ //TODO wire this to voltha core when it implements
+ return response, nil
+}
diff --git a/internal/pkg/openflow/utils.go b/internal/pkg/openflow/utils.go
new file mode 100644
index 0000000..f2d72e0
--- /dev/null
+++ b/internal/pkg/openflow/utils.go
@@ -0,0 +1,115 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package openflow
+
+import (
+ "fmt"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/opencord/voltha-protos/v2/go/openflow_13"
+ "strings"
+ "sync"
+)
+
+var mu sync.Mutex
+var xid uint32 = 1
+
+func GetXid() uint32 {
+ mu.Lock()
+ defer mu.Unlock()
+ xid++
+ return xid
+}
+func PadString(value string, padSize int) string {
+ size := len(value)
+ nullsNeeded := padSize - size
+ null := fmt.Sprintf("%c", '\000')
+ padded := strings.Repeat(null, nullsNeeded)
+ return fmt.Sprintf("%s%s", value, padded)
+}
+
+func extractAction(action ofp.IAction) *openflow_13.OfpAction {
+ var ofpAction openflow_13.OfpAction
+ switch action.GetType() {
+ case ofp.OFPATOutput:
+ var outputAction openflow_13.OfpAction_Output
+ loxiOutputAction := action.(*ofp.ActionOutput)
+ var output openflow_13.OfpActionOutput
+ output.Port = uint32(loxiOutputAction.GetPort())
+ /*
+ var maxLen uint16
+ maxLen = loxiOutputAction.GetMaxLen()
+ output.MaxLen = uint32(maxLen)
+
+ */
+ output.MaxLen = 0
+ outputAction.Output = &output
+ ofpAction.Action = &outputAction
+ ofpAction.Type = openflow_13.OfpActionType_OFPAT_OUTPUT
+ case ofp.OFPATCopyTtlOut: //CopyTtltOut
+ case ofp.OFPATCopyTtlIn: //CopyTtlIn
+ case ofp.OFPATSetMplsTtl: //SetMplsTtl
+ case ofp.OFPATDecMplsTtl: //DecMplsTtl
+ case ofp.OFPATPushVLAN: //PushVlan
+ var pushVlan openflow_13.OfpAction_Push
+ loxiPushAction := action.(*ofp.ActionPushVlan)
+ var push openflow_13.OfpActionPush
+ push.Ethertype = uint32(loxiPushAction.Ethertype) //TODO This should be available in the fields
+ pushVlan.Push = &push
+ ofpAction.Type = openflow_13.OfpActionType_OFPAT_PUSH_VLAN
+ ofpAction.Action = &pushVlan
+ case ofp.OFPATPopVLAN: //PopVlan
+ ofpAction.Type = openflow_13.OfpActionType_OFPAT_POP_VLAN
+ case ofp.OFPATPushMpls: //PushMpls
+ case ofp.OFPATPopMpls: //PopMpls
+ case ofp.OFPATSetQueue: //SetQueue
+ case ofp.OFPATGroup: //ActionGroup
+ case ofp.OFPATSetNwTtl: //SetNwTtl
+ case ofp.OFPATDecNwTtl: //DecNwTtl
+ case ofp.OFPATSetField: //SetField
+ ofpAction.Type = openflow_13.OfpActionType_OFPAT_SET_FIELD
+ var ofpAction_SetField openflow_13.OfpAction_SetField
+ var ofpActionSetField openflow_13.OfpActionSetField
+ var ofpOxmField openflow_13.OfpOxmField
+ ofpOxmField.OxmClass = openflow_13.OfpOxmClass_OFPXMC_OPENFLOW_BASIC
+ var ofpOxmField_OfbField openflow_13.OfpOxmField_OfbField
+ var ofpOxmOfbField openflow_13.OfpOxmOfbField
+ loxiSetField := action.(*ofp.ActionSetField)
+ oxmName := loxiSetField.Field.GetOXMName()
+ switch oxmName {
+ //TODO handle set field sith other fields
+ case "vlan_vid":
+ ofpOxmOfbField.Type = openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID
+ var vlanVid openflow_13.OfpOxmOfbField_VlanVid
+ var VlanVid = loxiSetField.Field.GetOXMValue().(uint16)
+ vlanVid.VlanVid = uint32(VlanVid)
+
+ ofpOxmOfbField.Value = &vlanVid
+ }
+ ofpOxmField_OfbField.OfbField = &ofpOxmOfbField
+ ofpOxmField.Field = &ofpOxmField_OfbField
+ ofpActionSetField.Field = &ofpOxmField
+ ofpAction_SetField.SetField = &ofpActionSetField
+ ofpAction.Action = &ofpAction_SetField
+
+ case ofp.OFPATPushPbb: //PushPbb
+ case ofp.OFPATPopPbb: //PopPbb
+ case ofp.OFPATExperimenter: //Experimenter
+
+ }
+ return &ofpAction
+
+}