VOL-291 : PON simulator refactoring for cluster integration
- Added ponsim build target in Makefile
- Added new option to vcore to select comm type with ponsim
- Modified all proto files to include destination go package
Amendments:
- Clean up based on review comments
- Properly close GRPC connections in ponsim_olt adapter
- Added voltha namespace to some k8s templates
Change-Id: I2f349fa7b3550a8a8cc8fc676cc896f33fbb9372
diff --git a/ponsim/v2/grpc/grpc_security.go b/ponsim/v2/grpc/grpc_security.go
new file mode 100644
index 0000000..6dab468
--- /dev/null
+++ b/ponsim/v2/grpc/grpc_security.go
@@ -0,0 +1,7 @@
+package grpc
+
+type GrpcSecurity struct {
+ KeyFile string
+ CertFile string
+ CaFile string
+}
diff --git a/ponsim/v2/grpc/grpc_server.go b/ponsim/v2/grpc/grpc_server.go
new file mode 100644
index 0000000..aaa8a4d
--- /dev/null
+++ b/ponsim/v2/grpc/grpc_server.go
@@ -0,0 +1,162 @@
+package grpc
+
+import (
+ "net"
+
+ "context"
+ "github.com/opencord/voltha/ponsim/v2/common"
+ "github.com/opencord/voltha/ponsim/v2/core"
+ "github.com/opencord/voltha/ponsim/v2/grpc/nbi"
+ "github.com/opencord/voltha/ponsim/v2/grpc/sbi"
+ "github.com/opencord/voltha/protos/go/bal"
+ "github.com/opencord/voltha/protos/go/ponsim"
+ "github.com/opencord/voltha/protos/go/voltha"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "strconv"
+ "strings"
+)
+
+type GrpcServer struct {
+ gs *grpc.Server
+ address string
+ port int32
+ secure bool
+ services []func(*grpc.Server)
+
+ *GrpcSecurity
+}
+
+/*
+Instantiate a GRPC server data structure
+*/
+func NewGrpcServer(
+ address string,
+ port int32,
+ certs *GrpcSecurity,
+ secure bool,
+) *GrpcServer {
+ server := &GrpcServer{
+ address: address,
+ port: port,
+ secure: secure,
+ GrpcSecurity: certs,
+ }
+ return server
+}
+
+/*
+Start prepares the GRPC server and starts servicing requests
+*/
+func (s *GrpcServer) Start(ctx context.Context) {
+ host := strings.Join([]string{
+ s.address,
+ strconv.Itoa(int(s.port)),
+ }, ":")
+
+ lis, err := net.Listen("tcp", host)
+ if err != nil {
+ common.Logger().Fatalf("failed to listen: %v", err)
+ }
+
+ if s.secure {
+ creds, err := credentials.NewServerTLSFromFile(s.CertFile, s.KeyFile)
+ if err != nil {
+ common.Logger().Fatalf("could not load TLS keys: %s", err)
+ }
+ s.gs = grpc.NewServer(grpc.Creds(creds))
+
+ } else {
+ common.Logger().Println("In DEFAULT\n")
+ s.gs = grpc.NewServer()
+ }
+
+ // Register all required services
+ for _, service := range s.services {
+ service(s.gs)
+ }
+
+ if err := s.gs.Serve(lis); err != nil {
+ common.Logger().Fatalf("failed to serve: %v\n", err)
+ }
+}
+
+/*
+Stop servicing GRPC requests
+*/
+func (s *GrpcServer) Stop() {
+ s.gs.Stop()
+}
+
+/*
+AddService appends a generic service request function
+*/
+func (s *GrpcServer) AddService(
+ registerFunction func(*grpc.Server, interface{}),
+ handler interface{},
+) {
+ s.services = append(s.services, func(gs *grpc.Server) { registerFunction(gs, handler) })
+}
+
+/*
+AddPonSimService appends service request functions for PonSim devices
+*/
+func (s *GrpcServer) AddPonSimService(device core.PonSimInterface) {
+ s.services = append(
+ s.services,
+ func(gs *grpc.Server) {
+ voltha.RegisterPonSimServer(gs, nbi.NewPonSimHandler(device))
+ },
+ )
+}
+
+/*
+AddCommonService appends service request functions common to all PonSim devices
+*/
+func (s *GrpcServer) AddCommonService(device core.PonSimInterface) {
+ s.services = append(
+ s.services,
+ func(gs *grpc.Server) {
+ ponsim.RegisterPonSimCommonServer(gs, sbi.NewPonSimCommonHandler(device))
+ },
+ )
+}
+
+/*
+AddOltService appends service request functions specific to OLT devices
+*/
+func (s *GrpcServer) AddOltService(device core.PonSimInterface) {
+ s.services = append(
+ s.services,
+ func(gs *grpc.Server) {
+ ponsim.RegisterPonSimOltServer(
+ gs,
+ sbi.NewPonSimOltHandler(device.(*core.PonSimOltDevice)),
+ )
+ },
+ )
+}
+
+/*
+AddXPonService appends service request functions specific to XPonSim
+*/
+func (s *GrpcServer) AddXPonService() {
+ s.services = append(
+ s.services,
+ func(gs *grpc.Server) {
+ voltha.RegisterXPonSimServer(gs, nbi.NewXPonSimHandler())
+ },
+ )
+}
+
+/*
+AddBalService appends service request functions specific to BAL
+*/
+func (s *GrpcServer) AddBalService() {
+ s.services = append(
+ s.services,
+ func(gs *grpc.Server) {
+ bal.RegisterBalServer(gs, nbi.NewBalHandler())
+ },
+ )
+}
diff --git a/ponsim/v2/grpc/nbi/bal_handler.go b/ponsim/v2/grpc/nbi/bal_handler.go
new file mode 100644
index 0000000..5cbafb3
--- /dev/null
+++ b/ponsim/v2/grpc/nbi/bal_handler.go
@@ -0,0 +1,82 @@
+package nbi
+
+import (
+ "context"
+ "github.com/opencord/voltha/ponsim/v2/common"
+ "github.com/opencord/voltha/protos/go/bal"
+)
+
+// TODO: fix BAL function parameters and returns
+
+type BalHandler struct {
+}
+
+func NewBalHandler() *BalHandler {
+ var handler *BalHandler
+ handler = &BalHandler{}
+ return handler
+}
+
+func (handler *BalHandler) BalApiInit(
+ ctx context.Context,
+ request *bal.BalInit,
+) (*bal.BalErr, error) {
+ common.Logger().Info("BalApiInit Called", ctx, request)
+ return &bal.BalErr{Err: bal.BalErrno_BAL_ERR_OK}, nil
+}
+
+func (handler *BalHandler) BalApiFinish(
+ ctx context.Context,
+ request *bal.BalCfg,
+) (*bal.BalErr, error) {
+ common.Logger().Info("BalApiFinish Called", ctx, request)
+ return &bal.BalErr{Err: bal.BalErrno_BAL_ERR_OK}, nil
+}
+
+func (handler *BalHandler) BalCfgSet(
+ ctx context.Context,
+ request *bal.BalCfg,
+) (*bal.BalErr, error) {
+ common.Logger().Info("BalCfgSet Called", ctx, request)
+ return &bal.BalErr{Err: bal.BalErrno_BAL_ERR_OK}, nil
+}
+
+func (handler *BalHandler) BalCfgClear(
+ ctx context.Context,
+ request *bal.BalKey,
+) (*bal.BalErr, error) {
+ common.Logger().Info("BalCfgClear Called", ctx, request)
+ return &bal.BalErr{Err: bal.BalErrno_BAL_ERR_OK}, nil
+}
+
+func (handler *BalHandler) BalCfgGet(
+ ctx context.Context,
+ request *bal.BalKey,
+) (*bal.BalCfg, error) {
+ common.Logger().Info("BalCfgGet Called", ctx, request)
+ return &bal.BalCfg{}, nil
+}
+
+func (handler *BalHandler) BalApiReboot(
+ ctx context.Context,
+ request *bal.BalReboot,
+) (*bal.BalErr, error) {
+ common.Logger().Info("BalApiReboot Called", ctx, request)
+ return &bal.BalErr{Err: bal.BalErrno_BAL_ERR_OK}, nil
+}
+
+func (handler *BalHandler) BalApiHeartbeat(
+ ctx context.Context,
+ request *bal.BalHeartbeat,
+) (*bal.BalRebootState, error) {
+ common.Logger().Info("BalApiHeartbeat Called", ctx, request)
+ return &bal.BalRebootState{}, nil
+}
+
+func (handler *BalHandler) BalCfgStatGet(
+ ctx context.Context,
+ request *bal.BalInterfaceKey,
+) (*bal.BalInterfaceStat, error) {
+ common.Logger().Info("BalCfgStatGet Called", ctx, request)
+ return &bal.BalInterfaceStat{}, nil
+}
diff --git a/ponsim/v2/grpc/nbi/ponsim_handler.go b/ponsim/v2/grpc/nbi/ponsim_handler.go
new file mode 100644
index 0000000..763f4bc
--- /dev/null
+++ b/ponsim/v2/grpc/nbi/ponsim_handler.go
@@ -0,0 +1,327 @@
+package nbi
+
+import (
+ "context"
+ "crypto/tls"
+ "errors"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/opencord/voltha/ponsim/v2/common"
+ "github.com/opencord/voltha/ponsim/v2/core"
+ "github.com/opencord/voltha/protos/go/voltha"
+ "github.com/sirupsen/logrus"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "strconv"
+ "strings"
+)
+
+// TODO: Cleanup GRPC security config
+// TODO: Pass-in the certificate information as a structure parameter
+
+type PonSimHandler struct {
+ device core.PonSimInterface
+}
+
+/*
+NewPonSimHandler instantiates a handler for a PonSim device
+*/
+func NewPonSimHandler(device core.PonSimInterface) *PonSimHandler {
+ var handler *PonSimHandler
+ handler = &PonSimHandler{device: device}
+ return handler
+}
+
+/*
+SendFrame handles and forwards EGRESS packets (i.e. VOLTHA to OLT)
+*/
+func (handler *PonSimHandler) SendFrame(ctx context.Context, data *voltha.PonSimFrame) (*empty.Empty, error) {
+ frame := gopacket.NewPacket(data.Payload, layers.LayerTypeEthernet, gopacket.Default)
+
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "frame": frame.Dump(),
+ }).Info("Constructed frame")
+
+ handler.device.Forward(context.Background(), 2, frame)
+
+ out := new(empty.Empty)
+ return out, nil
+}
+
+/*
+ReceiveFrames handles a stream of INGRESS packets (i.e. OLT to VOLTHA)
+*/
+func (handler *PonSimHandler) ReceiveFrames(empty *empty.Empty, stream voltha.PonSim_ReceiveFramesServer) error {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ }).Info("start-receiving-frames")
+
+ if _, ok := (handler.device).(*core.PonSimOltDevice); ok {
+ var data []byte
+ var ok bool
+
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "device": (handler.device).(*core.PonSimOltDevice),
+ }).Info("receiving-frames-from-olt-device")
+
+ for {
+ select {
+ case data, ok = <-(handler.device).(*core.PonSimOltDevice).GetOutgoing():
+ if ok {
+ frame := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.Default)
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "frame": frame,
+ }).Info("Received incoming data")
+
+ frameBytes := &voltha.PonSimFrame{Id: handler.device.GetAddress(), Payload: data}
+ if err := stream.Send(frameBytes); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "frame": frame,
+ "error": err,
+ }).Error("Failed to send incoming data")
+ return err
+ }
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "frame": frame,
+ }).Info("Sent incoming data")
+
+ } else {
+ return errors.New("incoming data channel has closed")
+ }
+ }
+ }
+
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ }).Error("Not handling an OLT device")
+ }
+
+ return nil
+}
+
+/*
+GetDeviceInfo returns information of a PonSim device (OLT or ONU)
+*/
+func (handler *PonSimHandler) GetDeviceInfo(
+ ctx context.Context,
+ empty *empty.Empty,
+) (*voltha.PonSimDeviceInfo, error) {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ }).Info("Getting device information")
+
+ var out *voltha.PonSimDeviceInfo
+
+ // Check which device type we're currently handling
+ if _, ok := (handler.device).(*core.PonSimOltDevice); ok {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ }).Debug("Handling OLT device")
+ keys := make([]int32, 0, len((handler.device).(*core.PonSimOltDevice).GetOnus()))
+ for k := range (handler.device).(*core.PonSimOltDevice).GetOnus() {
+ keys = append(keys, k)
+ }
+ out = &voltha.PonSimDeviceInfo{NniPort: 0, UniPorts: []int32(keys)}
+
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ }).Debug("Handling ONU/OTHER device")
+
+ out = &voltha.PonSimDeviceInfo{}
+ }
+
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "result": out,
+ }).Info("Device information")
+
+ return out, nil
+}
+
+/*
+UpdateFlowTable populates and cleans up the flows for a PonSim device
+*/
+func (handler *PonSimHandler) UpdateFlowTable(
+ ctx context.Context,
+ table *voltha.FlowTable,
+) (*empty.Empty, error) {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "table": table,
+ }).Info("Updating flows")
+
+ if _, ok := (handler.device).(*core.PonSimOltDevice); ok {
+ if table.Port == 0 {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "port": table.Port,
+ }).Debug("Updating OLT flows")
+
+ if err := (handler.device).(*core.PonSimOltDevice).InstallFlows(ctx, table.Flows); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "error": err.Error(),
+ "flows": table.Flows,
+ }).Error("Problem updating flows on OLT")
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ }).Debug("Updated OLT flows")
+ }
+
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "port": table.Port,
+ }).Debug("Updating ONU flows")
+
+ if child, ok := (handler.device).(*core.PonSimOltDevice).GetOnus()[table.Port]; ok {
+ // TODO: make it secure
+ ta := credentials.NewTLS(&tls.Config{
+ InsecureSkipVerify: true,
+ })
+
+ host := strings.Join([]string{
+ child.Device.Address,
+ strconv.Itoa(int(child.Device.Port)),
+ }, ":")
+
+ conn, err := grpc.Dial(
+ host,
+ grpc.WithTransportCredentials(ta),
+ )
+ if err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "error": err.Error(),
+ }).Error("GRPC Connection problem")
+ }
+ defer conn.Close()
+ client := voltha.NewPonSimClient(conn)
+
+ if _, err = client.UpdateFlowTable(ctx, table); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "host": host,
+ "error": err.Error(),
+ }).Error("Problem forwarding update request to ONU")
+ }
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "port": table.Port,
+ }).Warn("Unable to find ONU")
+ }
+
+ }
+ } else if _, ok := (handler.device).(*core.PonSimOnuDevice); ok {
+ if err := (handler.device).(*core.PonSimOnuDevice).InstallFlows(ctx, table.Flows); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "error": err.Error(),
+ "flows": table.Flows,
+ }).Error("Problem updating flows on ONU")
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ }).Debug("Updated ONU flows")
+ }
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "port": table.Port,
+ }).Warn("Unknown device")
+ }
+
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "table": table,
+ }).Info("Updated flows")
+
+ out := new(empty.Empty)
+ return out, nil
+}
+
+/*
+GetStats retrieves statistics for a PonSim device
+*/
+func (handler *PonSimHandler) GetStats(
+ ctx context.Context,
+ empty *empty.Empty,
+) (*voltha.PonSimMetrics, error) {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ }).Info("Retrieving stats")
+
+ var metrics *voltha.PonSimMetrics = new(voltha.PonSimMetrics)
+
+ if olt, ok := (handler.device).(*core.PonSimOltDevice); ok {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "olt": olt,
+ }).Debug("Retrieving stats for OLT")
+
+ // Get stats for current device
+
+ // Loop through each onus to get stats from those as well?
+ // send grpc request to each onu
+ for _, child := range (handler.device).(*core.PonSimOltDevice).GetOnus() {
+ // TODO: make it secure
+ ta := credentials.NewTLS(&tls.Config{
+ InsecureSkipVerify: true,
+ })
+
+ host := strings.Join([]string{child.Device.Address, strconv.Itoa(int(child.Device.Port))}, ":")
+ conn, err := grpc.Dial(
+ host,
+ grpc.WithTransportCredentials(ta),
+ )
+ if err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "error": err.Error(),
+ }).Error("GRPC Connection problem")
+ }
+ defer conn.Close()
+ client := voltha.NewPonSimClient(conn)
+
+ if _, err = client.GetStats(ctx, empty); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "host": host,
+ "error": err.Error(),
+ }).Error("Problem forwarding stats request to ONU")
+ }
+ }
+ metrics = (handler.device).(*core.PonSimOltDevice).Counter.MakeProto()
+
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "metrics": metrics,
+ }).Debug("OLT Metrics")
+
+ } else if onu, ok := (handler.device).(*core.PonSimOnuDevice); ok {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "onu": onu,
+ }).Debug("Retrieving stats for ONU")
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ }).Warn("Unknown device")
+ }
+
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ }).Info("Retrieved stats")
+
+ return metrics, nil
+}
diff --git a/ponsim/v2/grpc/nbi/xponsim_handler.go b/ponsim/v2/grpc/nbi/xponsim_handler.go
new file mode 100644
index 0000000..fbc63eb
--- /dev/null
+++ b/ponsim/v2/grpc/nbi/xponsim_handler.go
@@ -0,0 +1,124 @@
+package nbi
+
+import (
+ "context"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha/ponsim/v2/core"
+ "github.com/opencord/voltha/protos/go/voltha"
+)
+
+type XPonSimHandler struct {
+ device *core.XPonSimDevice
+}
+
+func NewXPonSimHandler() *XPonSimHandler {
+ var handler *XPonSimHandler
+ handler = &XPonSimHandler{}
+ return handler
+}
+
+func (handler *XPonSimHandler) CreateInterface(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.CreateInterface(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) UpdateInterface(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.UpdateInterface(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) RemoveInterface(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.RemoveInterface(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) CreateTcont(
+ ctx context.Context,
+ config *voltha.TcontInterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.CreateTcont(ctx, config.TcontsConfigData, config.TrafficDescriptorProfileConfigData)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) UpdateTcont(
+ ctx context.Context,
+ config *voltha.TcontInterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.UpdateTcont(ctx, config.TcontsConfigData, config.TrafficDescriptorProfileConfigData)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) RemoveTcont(
+ ctx context.Context,
+ config *voltha.TcontInterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.RemoveTcont(ctx, config.TcontsConfigData, config.TrafficDescriptorProfileConfigData)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) CreateGemport(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.CreateGemport(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) UpdateGemport(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.UpdateGemport(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) RemoveGemport(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.RemoveGemport(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) CreateMulticastGemport(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.CreateMulticastGemport(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) UpdateMulticastGemport(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.UpdateMulticastGemport(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) RemoveMulticastGemport(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.RemoveMulticastGemport(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) CreateMulticastDistributionSet(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.CreateMulticastDistributionSet(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) UpdateMulticastDistributionSet(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.UpdateMulticastDistributionSet(ctx, config)
+ return &empty.Empty{}, nil
+}
+func (handler *XPonSimHandler) RemoveMulticastDistributionSet(
+ ctx context.Context,
+ config *voltha.InterfaceConfig,
+) (*empty.Empty, error) {
+ handler.device.RemoveMulticastDistributionSet(ctx, config)
+ return &empty.Empty{}, nil
+}
diff --git a/ponsim/v2/grpc/sbi/common_handler.go b/ponsim/v2/grpc/sbi/common_handler.go
new file mode 100644
index 0000000..4c59af6
--- /dev/null
+++ b/ponsim/v2/grpc/sbi/common_handler.go
@@ -0,0 +1,72 @@
+package sbi
+
+import (
+ "context"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/opencord/voltha/ponsim/v2/common"
+ "github.com/opencord/voltha/ponsim/v2/core"
+ "github.com/opencord/voltha/protos/go/ponsim"
+ "github.com/sirupsen/logrus"
+ "io"
+)
+
+type PonSimCommonHandler struct {
+ device core.PonSimInterface
+}
+
+/*
+NewPonSimCommonHandler instantiates a handler for common GRPC servicing methods
+*/
+func NewPonSimCommonHandler(device core.PonSimInterface) *PonSimCommonHandler {
+ var handler *PonSimCommonHandler
+
+ handler = &PonSimCommonHandler{device: device}
+
+ return handler
+}
+
+/*
+ProcessData handles and forwards streaming INGRESS/EGRESS packets
+*/
+func (h *PonSimCommonHandler) ProcessData(stream ponsim.PonSimCommon_ProcessDataServer) error {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": h,
+ }).Debug("Processing data")
+
+ var err error
+ var data *ponsim.IncomingData
+
+ for {
+
+ if data, err = stream.Recv(); err == io.EOF {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": h,
+ }).Warn("Streaming channel was closed")
+ return stream.SendAndClose(&empty.Empty{})
+ } else if err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": h,
+ "error": err.Error(),
+ }).Warn("Error occurred with stream")
+ return err
+ }
+
+ frame := gopacket.NewPacket(data.Payload, layers.LayerTypeEthernet, gopacket.Default)
+
+ h.device.Forward(
+ context.Background(),
+ int(data.Port),
+ frame,
+ )
+ common.Logger().WithFields(logrus.Fields{
+ "handler": h,
+ "frame": frame,
+ "port": data.Port,
+ }).Debug("Retrieved and forwarded packet")
+
+ }
+
+ return nil
+}
diff --git a/ponsim/v2/grpc/sbi/olt_handler.go b/ponsim/v2/grpc/sbi/olt_handler.go
new file mode 100644
index 0000000..bb058ca
--- /dev/null
+++ b/ponsim/v2/grpc/sbi/olt_handler.go
@@ -0,0 +1,62 @@
+package sbi
+
+import (
+ "context"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha/ponsim/v2/common"
+ "github.com/opencord/voltha/ponsim/v2/core"
+ "github.com/opencord/voltha/protos/go/ponsim"
+ "github.com/sirupsen/logrus"
+)
+
+type PonSimOltHandler struct {
+ olt *core.PonSimOltDevice
+}
+
+func NewPonSimOltHandler(olt *core.PonSimOltDevice) *PonSimOltHandler {
+ var handler *PonSimOltHandler
+
+ handler = &PonSimOltHandler{olt: olt}
+
+ return handler
+}
+
+func (h *PonSimOltHandler) Register(
+ ctx context.Context,
+ request *ponsim.RegistrationRequest,
+) (*ponsim.RegistrationReply, error) {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": h,
+ }).Info("Registering device")
+
+ onu := &core.PonSimOnuDevice{
+ PonSimDevice: core.PonSimDevice{
+ Address: request.Address, Port: request.Port, //GrpcSecurity: h.olt.GrpcSecurity,
+ }}
+
+ if assignedPort, err := h.olt.AddOnu(onu); assignedPort == -1 || err != nil {
+ return &ponsim.RegistrationReply{
+ Id: uuid.New().String(),
+ Status: ponsim.RegistrationReply_FAILED,
+ StatusMessage: "Failed to register ONU",
+ ParentAddress: common.GetInterfaceIP(h.olt.ExternalIf),
+ ParentPort: h.olt.Port,
+ AssignedPort: assignedPort,
+ }, err
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": h,
+ "onus": h.olt.GetOnus(),
+ }).Debug("ONU Added")
+
+ return &ponsim.RegistrationReply{
+ Id: uuid.New().String(),
+ Status: ponsim.RegistrationReply_REGISTERED,
+ StatusMessage: "Successfully registered ONU",
+ ParentAddress: common.GetInterfaceIP(h.olt.ExternalIf),
+ ParentPort: h.olt.Port,
+ AssignedPort: assignedPort,
+ }, nil
+
+ }
+}