initial add - go fmt on grpc
Change-Id: Ib0afadd2fe5571d1456a091f94f5644458f7d3f4
diff --git a/grpc/changeEvent.go b/grpc/changeEvent.go
new file mode 100644
index 0000000..275700d
--- /dev/null
+++ b/grpc/changeEvent.go
@@ -0,0 +1,89 @@
+/*
+ Copyright 2017 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package grpc
+
+import (
+ "context"
+ "encoding/json"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/ofagent-go/openflow"
+ pb "github.com/opencord/voltha-protos/go/voltha"
+ ofp "github.com/skydive-project/goloxi/of13"
+ "google.golang.org/grpc"
+ "log"
+ "net"
+ "time"
+)
+
+func receiveChangeEvent(client pb.VolthaServiceClient) {
+ opt := grpc.EmptyCallOption{}
+ stream, err := client.ReceiveChangeEvents(context.Background(), &empty.Empty{}, opt)
+ if err != nil {
+ log.Fatalln("Unable to establish stream")
+ }
+ for {
+ changeEvent, err := stream.Recv()
+ if err != nil {
+ log.Printf("Error receiving change event %v", err)
+ }
+ js, _ := json.Marshal(changeEvent)
+
+ log.Printf("Received Change Event \n\n\n %s \n\n\n", js)
+ deviceId := changeEvent.GetId()
+ portStatus := changeEvent.GetPortStatus()
+ if portStatus == nil {
+ jsonMessage, _ := json.Marshal(changeEvent.GetEvent())
+ log.Printf("Received change event that was not port status %v", jsonMessage)
+ break
+ }
+ log.Println("???????????????????????????????????????????????????????????????????")
+ ofPortStatus := ofp.NewPortStatus()
+ ofPortStatus.SetXid(openflow.GetXid())
+ ofPortStatus.SetVersion(4)
+
+ ofReason := ofp.PortReason(portStatus.GetReason())
+ ofPortStatus.SetReason(ofReason)
+ ofDesc := ofp.NewPortDesc()
+
+ desc := portStatus.GetDesc()
+ ofDesc.SetAdvertised(ofp.PortFeatures(desc.GetAdvertised()))
+ ofDesc.SetConfig(ofp.PortConfig(0))
+ ofDesc.SetCurr(ofp.PortFeatures(desc.GetAdvertised()))
+ ofDesc.SetCurrSpeed(desc.GetCurrSpeed())
+ intArray := desc.GetHwAddr()
+ var octets []byte
+ for i := 0; i < len(intArray); i++ {
+ octets = append(octets, byte(intArray[i]))
+ }
+ addr := net.HardwareAddr(octets)
+ ofDesc.SetHwAddr(addr)
+ ofDesc.SetMaxSpeed(desc.GetMaxSpeed())
+ ofDesc.SetName(openflow.PadString(desc.GetName(), 16))
+ ofDesc.SetPeer(ofp.PortFeatures(desc.GetPeer()))
+ ofDesc.SetPortNo(ofp.Port(desc.GetPortNo()))
+ ofDesc.SetState(ofp.PortState(desc.GetState()))
+ ofDesc.SetSupported(ofp.PortFeatures(desc.GetSupported()))
+ ofPortStatus.SetDesc(*ofDesc)
+ var client = clientMap[deviceId]
+ if client == nil {
+ client = addClient(deviceId)
+
+ time.Sleep(2 * time.Second)
+ }
+ client.SendMessage(ofPortStatus)
+ }
+}
diff --git a/grpc/client.go b/grpc/client.go
new file mode 100644
index 0000000..acb2304
--- /dev/null
+++ b/grpc/client.go
@@ -0,0 +1,101 @@
+/*
+ Copyright 2017 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package grpc
+
+import (
+ "context"
+ "time"
+
+ "github.com/golang/protobuf/ptypes/empty"
+
+ "github.com/opencord/ofagent-go/openflow"
+
+ "fmt"
+ "log"
+
+ pb "github.com/opencord/voltha-protos/go/voltha"
+ "google.golang.org/grpc"
+)
+
+var client pb.VolthaServiceClient
+var clientMap map[string]*openflow.Client
+var ofAddress string
+var ofPort uint16
+
+func StartClient(endpointAddress string, endpointPort uint16, openFlowAddress string, openFlowPort uint16) {
+ ofAddress = openFlowAddress
+ ofPort = openFlowPort
+ clientMap = make(map[string]*openflow.Client)
+ var opts []grpc.DialOption
+ opts = append(opts, grpc.WithInsecure())
+ endpoint := fmt.Sprintf("%s:%d", endpointAddress, endpointPort)
+ conn, err := grpc.Dial(endpoint, opts...)
+ defer conn.Close()
+ if err != nil {
+ log.Fatalf("fail to dial: %v", err)
+ }
+ client = pb.NewVolthaServiceClient(conn)
+
+ go receivePacketIn(client)
+ go receiveChangeEvent(client)
+ go streamPacketOut(client)
+
+ openflow.SetGrpcClient(&client)
+ for {
+ log.Println("entering device refresh pull")
+ deviceList, err := client.ListLogicalDevices(context.Background(), &empty.Empty{})
+ if err != nil {
+ log.Printf("ERROR GET DEVICE LIST %v", err)
+ }
+ devices := deviceList.GetItems()
+ refreshDeviceList(devices)
+ time.Sleep(time.Minute)
+ log.Println("waking up")
+ }
+}
+func refreshDeviceList(devices []*pb.LogicalDevice) {
+ //first find the new ones
+ var toAdd []string
+ var toDel []string
+ var deviceIdMap = make(map[string]string)
+ for i := 0; i < len(devices); i++ {
+ log.Printf("Device ID %s", devices[i].GetId())
+ deviceId := devices[i].GetId()
+ deviceIdMap[deviceId] = deviceId
+ if clientMap[deviceId] == nil {
+ toAdd = append(toAdd, deviceId)
+ }
+ }
+ for key, _ := range clientMap {
+ if deviceIdMap[key] == "" {
+ toDel = append(toDel, key)
+ }
+ }
+ for i := 0; i < len(toAdd); i++ {
+ var client = addClient(toAdd[i])
+ go client.Start()
+ }
+ for i := 0; i < len(toDel); i++ {
+ clientMap[toDel[i]].End()
+ delete(clientMap, toDel[i])
+ }
+}
+func addClient(deviceId string) *openflow.Client {
+ client := openflow.NewClient(ofAddress, ofPort, deviceId, true)
+ clientMap[deviceId] = client
+ return client
+}
diff --git a/grpc/packetIn.go b/grpc/packetIn.go
new file mode 100644
index 0000000..938dabe
--- /dev/null
+++ b/grpc/packetIn.go
@@ -0,0 +1,71 @@
+/*
+ Copyright 2017 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package grpc
+
+import (
+ "context"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/ofagent-go/openflow"
+ pb "github.com/opencord/voltha-protos/go/voltha"
+ ofp "github.com/skydive-project/goloxi/of13"
+ "google.golang.org/grpc"
+ "log"
+)
+
+func receivePacketIn(client pb.VolthaServiceClient) {
+ opt := grpc.EmptyCallOption{}
+ stream, err := client.ReceivePacketsIn(context.Background(), &empty.Empty{}, opt)
+ if err != nil {
+ log.Fatalln("Unable to establish stream")
+ }
+ for {
+ packet, err := stream.Recv()
+ packetIn := packet.GetPacketIn()
+
+ if err != nil {
+ log.Fatalf("error on stream.Rec %v", err)
+ }
+ ofPacketIn := ofp.NewPacketIn()
+ ofPacketIn.SetVersion(uint8(4))
+ ofPacketIn.SetXid(openflow.GetXid())
+ ofPacketIn.SetBufferId(packetIn.GetBufferId())
+ ofPacketIn.SetCookie(packetIn.GetCookie())
+ ofPacketIn.SetData(packetIn.GetData())
+ var outMatch ofp.Match
+ inMatch := packetIn.GetMatch()
+ outMatch.SetType(uint16(inMatch.GetType()))
+ /*
+ TODO not sure if anything further is needed
+ fields := inMatch.GetOxmFields()
+ var outFields []ofp.Oxm
+ for i:=0;i< len(fields);i++{
+ field := fields[i]
+ outField := ofp.Oxm{}
+ outField.SetTypeLen(field.OxmClass.)
+ outField.SetTypeLen(field)
+ }
+ outMatch.SetOxmList(inMatch.GetOxmFields())
+ */
+
+ ofPacketIn.SetMatch(outMatch)
+ ofPacketIn.SetReason(uint8(packetIn.GetReason()))
+ ofPacketIn.SetTableId(uint8(packetIn.GetTableId()))
+ ofPacketIn.SetTotalLen(uint16(len(ofPacketIn.GetData())))
+ //ofPacketIn.
+ }
+
+}
diff --git a/grpc/packetOut.go b/grpc/packetOut.go
new file mode 100644
index 0000000..02920ae
--- /dev/null
+++ b/grpc/packetOut.go
@@ -0,0 +1,39 @@
+/*
+ Copyright 2017 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package grpc
+
+import (
+ pb "github.com/opencord/voltha-protos/go/voltha"
+ "google.golang.org/grpc"
+ "log"
+)
+import "context"
+
+func streamPacketOut(client pb.VolthaServiceClient) {
+ opt := grpc.EmptyCallOption{}
+ outClient, err := client.StreamPacketsOut(context.Background(), opt)
+ if err != nil {
+
+ log.Printf("Error creating packetout stream %v", err)
+ }
+ packetOutChannel := make(chan pb.PacketOut)
+ for {
+ ofPacketOut := <-packetOutChannel
+ outClient.Send(&ofPacketOut)
+ }
+
+}