SEBA-964 Remove hardcoded hostnames and replace with config system;
Move code to canonical golang directory structure;
Update licensing;
Update README.md
Change-Id: I4b00301eef4e87e81fc731a5af20cf7b7ed68678
diff --git a/internal/pkg/proxy/config.go b/internal/pkg/proxy/config.go
new file mode 100755
index 0000000..9d1b949
--- /dev/null
+++ b/internal/pkg/proxy/config.go
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Implements global configuration for nem-ondemand-proxy
+package proxy
+
+import (
+ "fmt"
+ flags "github.com/jessevdk/go-flags"
+ "gopkg.in/yaml.v2"
+ "io/ioutil"
+ "log"
+ "os"
+ "path"
+ "path/filepath"
+ "strings"
+ "time"
+)
+
+type OutputType uint8
+
+type GrpcConfigSpec struct {
+ Timeout time.Duration `yaml:"timeout"`
+}
+
+type TlsConfigSpec struct {
+ UseTls bool `yaml:"useTls"`
+ CACert string `yaml:"caCert"`
+ Cert string `yaml:"cert"`
+ Key string `yaml:"key"`
+ Verify string `yaml:"verify"`
+}
+
+type GlobalConfigSpec struct {
+ Server string `yaml:"server"`
+ Kafka string `yaml:"kafka"`
+ Local string `yaml:"local"`
+ Tls TlsConfigSpec `yaml:"tls"`
+ Grpc GrpcConfigSpec `yaml:"grpc"`
+}
+
+var (
+ CharReplacer = strings.NewReplacer("\\t", "\t", "\\n", "\n")
+
+ GlobalConfig = GlobalConfigSpec{
+ Server: "voltha-rw-core.voltha:50057",
+ Kafka: "voltha-kafka.voltha:9092",
+ Local: "0.0.0.0:50052",
+ Tls: TlsConfigSpec{
+ UseTls: false,
+ },
+ Grpc: GrpcConfigSpec{
+ Timeout: time.Minute * 5,
+ },
+ }
+
+ GlobalCommandOptions = make(map[string]map[string]string)
+
+ GlobalOptions struct {
+ Config string `short:"c" long:"config" env:"PROXYCONFIG" value-name:"FILE" default:"" description:"Location of proxy config file"`
+ Server string `short:"s" long:"server" default:"" value-name:"SERVER:PORT" description:"IP/Host and port of VOLTHA"`
+ Kafka string `short:"k" long:"kafka" default:"" value-name:"SERVER:PORT" description:"IP/Host and port of Kafka"`
+ Local string `short:"l" long:"local" default:"" value-name:"SERVER:PORT" description:"IP/Host and port to listen on"`
+
+ // The following are not necessarily implemented yet.
+ Debug bool `short:"d" long:"debug" description:"Enable debug mode"`
+ Timeout string `short:"t" long:"timeout" description:"API call timeout duration" value-name:"DURATION" default:""`
+ UseTLS bool `long:"tls" description:"Use TLS"`
+ CACert string `long:"tlscacert" value-name:"CA_CERT_FILE" description:"Trust certs signed only by this CA"`
+ Cert string `long:"tlscert" value-name:"CERT_FILE" description:"Path to TLS vertificate file"`
+ Key string `long:"tlskey" value-name:"KEY_FILE" description:"Path to TLS key file"`
+ Verify bool `long:"tlsverify" description:"Use TLS and verify the remote"`
+ }
+
+ Debug = log.New(os.Stdout, "DEBUG: ", 0)
+ Info = log.New(os.Stdout, "INFO: ", 0)
+ Warn = log.New(os.Stderr, "WARN: ", 0)
+ Error = log.New(os.Stderr, "ERROR: ", 0)
+)
+
+func ParseCommandLine() {
+ parser := flags.NewNamedParser(path.Base(os.Args[0]),
+ flags.HelpFlag|flags.PassDoubleDash|flags.PassAfterNonOption)
+ _, err := parser.AddGroup("Global Options", "", &GlobalOptions)
+ if err != nil {
+ panic(err)
+ }
+
+ _, err = parser.ParseArgs(os.Args[1:])
+ if err != nil {
+ _, ok := err.(*flags.Error)
+ if ok {
+ real := err.(*flags.Error)
+ if real.Type == flags.ErrHelp {
+ os.Stdout.WriteString(err.Error() + "\n")
+ os.Exit(0)
+ }
+ }
+
+ fmt.Fprintf(os.Stderr, "%s: %s\n", os.Args[0], err.Error())
+
+ os.Exit(1)
+ }
+}
+
+func ProcessGlobalOptions() {
+ if len(GlobalOptions.Config) == 0 {
+ home, err := os.UserHomeDir()
+ if err != nil {
+ Warn.Printf("Unable to discover the user's home directory: %s", err)
+ home = "~"
+ }
+ GlobalOptions.Config = filepath.Join(home, ".nem", "config")
+ }
+
+ if info, err := os.Stat(GlobalOptions.Config); err == nil && !info.IsDir() {
+ configFile, err := ioutil.ReadFile(GlobalOptions.Config)
+ if err != nil {
+ Error.Fatalf("Unable to read the configuration file '%s': %s",
+ GlobalOptions.Config, err.Error())
+ }
+ if err = yaml.Unmarshal(configFile, &GlobalConfig); err != nil {
+ Error.Fatalf("Unable to parse the configuration file '%s': %s",
+ GlobalOptions.Config, err.Error())
+ }
+ }
+
+ // Override from command line
+ if GlobalOptions.Server != "" {
+ GlobalConfig.Server = GlobalOptions.Server
+ }
+ if GlobalOptions.Kafka != "" {
+ GlobalConfig.Kafka = GlobalOptions.Kafka
+ }
+ if GlobalOptions.Local != "" {
+ GlobalConfig.Local = GlobalOptions.Local
+ }
+
+ if GlobalOptions.Timeout != "" {
+ timeout, err := time.ParseDuration(GlobalOptions.Timeout)
+ if err != nil {
+ Error.Fatalf("Unable to parse specified timeout duration '%s': %s",
+ GlobalOptions.Timeout, err.Error())
+ }
+ GlobalConfig.Grpc.Timeout = timeout
+ }
+}
+
+func ShowGlobalOptions() {
+ log.Printf("Configuration:")
+ log.Printf(" Voltha gRPC Server: %v", GlobalConfig.Server)
+ log.Printf(" Kafka: %v", GlobalConfig.Kafka)
+ log.Printf(" Listen Address: %v", GlobalConfig.Local)
+}
diff --git a/internal/pkg/proxy/grpc_client.go b/internal/pkg/proxy/grpc_client.go
new file mode 100644
index 0000000..c69f721
--- /dev/null
+++ b/internal/pkg/proxy/grpc_client.go
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Implements a client for nem-ondemend-proxy.
+package proxy
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/Shopify/sarama"
+ "github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
+ pb "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc"
+ "log"
+ "time"
+)
+
+/*
+ * TODO: Consider refactoring so that the kafka and grpc clients are
+ * initialized once rather than for each request that is handled.
+ *
+ */
+
+type OnDemandHandler struct {
+}
+
+func NewOnDemandHandler() *OnDemandHandler {
+ var handler OnDemandHandler
+ return &handler
+}
+
+func (handler *OnDemandHandler) HandleRequest(device_id *string) (*pb.Event, error) {
+ // Set up a connection to the server.
+ log.Printf("voltha grpc client started, address=%s ...", GlobalConfig.Server)
+ conn, err := grpc.Dial(GlobalConfig.Server, grpc.WithInsecure(), grpc.WithBlock())
+ if err != nil {
+ log.Printf("did not connect: %v", err)
+ return nil, err
+ }
+ defer conn.Close()
+ c := pb.NewVolthaServiceClient(conn)
+ id, err := uuid.NewUUID()
+ log.Printf("ID: %s", id.String())
+ if err != nil {
+ log.Printf("did not generate uuid: %v", err)
+ return nil, err
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ log.Printf("Calling StartOmciTestAction")
+ r, err := c.StartOmciTestAction(ctx, &pb.OmciTestRequest{Id: *device_id, Uuid: id.String()})
+ if err != nil {
+ return nil, fmt.Errorf("start-omci-test-action-failed: %v", err)
+ }
+ log.Printf("Result: %s", r.Result)
+ djson, _ := json.Marshal(r.Result)
+ result := &pb.Event{}
+ if string(djson) == "0" {
+ config := sarama.NewConfig()
+ config.ClientID = "go-kafka-consumer"
+ config.Consumer.Return.Errors = true
+
+ brokers := []string{GlobalConfig.Kafka}
+ // Create new consumer
+ master, err := sarama.NewConsumer(brokers, config)
+ if err != nil {
+ panic(err)
+ }
+ defer func() {
+ if err := master.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ topic := "voltha.events"
+ consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetNewest)
+ if err != nil {
+ panic(err)
+ }
+
+ // Get signnal for finish
+ doneCh := make(chan struct{})
+ go func() {
+ // TODO: Needs a timeout in here
+ for {
+ select {
+ case err := <-consumer.Errors():
+ fmt.Println(err)
+ case msg := <-consumer.Messages():
+ unpackResult := &pb.Event{}
+ var err error
+ if err = proto.Unmarshal(msg.Value, unpackResult); err != nil {
+ fmt.Println("Error while doing unmarshal", err)
+ }
+ kpi_event2 := unpackResult.GetKpiEvent2()
+ if (kpi_event2 != nil) && (kpi_event2.SliceData[0].Metadata.Uuid == id.String()) {
+ result = unpackResult
+ close(doneCh)
+ return
+ }
+ }
+ }
+ }()
+ <-doneCh
+ log.Printf("Result: %s", result)
+ }
+ return result, nil
+}
diff --git a/internal/pkg/proxy/nem_grpc_server.go b/internal/pkg/proxy/nem_grpc_server.go
new file mode 100644
index 0000000..75ef634
--- /dev/null
+++ b/internal/pkg/proxy/nem_grpc_server.go
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Implements a server for nem-ondemand-proxy
+package proxy
+
+import (
+ "context"
+ "github.com/opencord/nem-ondemand-proxy/protos/nem_ondemand_api"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
+ "log"
+ "net"
+)
+
+//server is used to implement the grpc server for the proxy
+type server struct {
+ handler *OnDemandHandler
+ grpcServer *grpc.Server
+}
+
+func NewOnDemandServer(handler *OnDemandHandler) *server {
+ s := &server{}
+
+ s.grpcServer = grpc.NewServer()
+ s.handler = handler
+
+ nem_ondemand_api.RegisterNemServiceServer(s.grpcServer, s)
+ reflection.Register(s.grpcServer)
+
+ return s
+}
+
+func (s *server) StartServing() error {
+ lis, err := net.Listen("tcp", GlobalConfig.Local)
+ if err != nil {
+ log.Fatalf("failed to listen: %v", err)
+ }
+
+ if err := s.grpcServer.Serve(lis); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (s *server) OmciTest(ctx context.Context, id *nem_ondemand_api.OnuID) (*nem_ondemand_api.ResponseTest, error) {
+ log.Printf("Request Received from operator client: %s", id.Id)
+ resp, err := s.handler.HandleRequest(&id.Id)
+ if err != nil {
+ log.Printf("%s", err)
+ return nil, err
+ }
+ log.Printf("Result received from voltha-grpc-client: %s", resp.String())
+ if len(resp.String()) > 0 {
+ return &nem_ondemand_api.ResponseTest{Result: resp.String()}, nil
+ }
+ return &nem_ondemand_api.ResponseTest{Result: "FAILURE"}, nil
+}