blob: c69f721bbed072a6597d0302db96b3a08da31e7c [file] [log] [blame]
onkarkundargi72cfd362020-02-27 12:34:37 +05301/*
2 * Copyright 2018-present Open Networking Foundation
Scott Bakerf04b6392020-03-20 08:29:46 -07003 *
onkarkundargi72cfd362020-02-27 12:34:37 +05304 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Scott Bakerf04b6392020-03-20 08:29:46 -07007 *
onkarkundargi72cfd362020-02-27 12:34:37 +05308 * http://www.apache.org/licenses/LICENSE-2.0
Scott Bakerf04b6392020-03-20 08:29:46 -07009 *
onkarkundargi72cfd362020-02-27 12:34:37 +053010 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Scott Bakerf04b6392020-03-20 08:29:46 -070017// Implements a client for nem-ondemend-proxy.
18package proxy
onkarkundargi72cfd362020-02-27 12:34:37 +053019
20import (
21 "context"
22 "encoding/json"
23 "fmt"
24 "github.com/Shopify/sarama"
25 "github.com/golang/protobuf/proto"
26 "github.com/google/uuid"
27 pb "github.com/opencord/voltha-protos/v3/go/voltha"
28 "google.golang.org/grpc"
29 "log"
30 "time"
31)
32
Scott Bakerf04b6392020-03-20 08:29:46 -070033/*
34 * TODO: Consider refactoring so that the kafka and grpc clients are
35 * initialized once rather than for each request that is handled.
36 *
37 */
onkarkundargi72cfd362020-02-27 12:34:37 +053038
Scott Bakerf04b6392020-03-20 08:29:46 -070039type OnDemandHandler struct {
40}
41
42func NewOnDemandHandler() *OnDemandHandler {
43 var handler OnDemandHandler
44 return &handler
45}
46
47func (handler *OnDemandHandler) HandleRequest(device_id *string) (*pb.Event, error) {
onkarkundargi72cfd362020-02-27 12:34:37 +053048 // Set up a connection to the server.
Scott Bakerf04b6392020-03-20 08:29:46 -070049 log.Printf("voltha grpc client started, address=%s ...", GlobalConfig.Server)
50 conn, err := grpc.Dial(GlobalConfig.Server, grpc.WithInsecure(), grpc.WithBlock())
onkarkundargi72cfd362020-02-27 12:34:37 +053051 if err != nil {
52 log.Printf("did not connect: %v", err)
53 return nil, err
54 }
55 defer conn.Close()
56 c := pb.NewVolthaServiceClient(conn)
57 id, err := uuid.NewUUID()
58 log.Printf("ID: %s", id.String())
59 if err != nil {
60 log.Printf("did not generate uuid: %v", err)
61 return nil, err
62 }
63 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
64 defer cancel()
65 log.Printf("Calling StartOmciTestAction")
66 r, err := c.StartOmciTestAction(ctx, &pb.OmciTestRequest{Id: *device_id, Uuid: id.String()})
67 if err != nil {
68 return nil, fmt.Errorf("start-omci-test-action-failed: %v", err)
69 }
70 log.Printf("Result: %s", r.Result)
71 djson, _ := json.Marshal(r.Result)
72 result := &pb.Event{}
73 if string(djson) == "0" {
74 config := sarama.NewConfig()
75 config.ClientID = "go-kafka-consumer"
76 config.Consumer.Return.Errors = true
Scott Bakerf04b6392020-03-20 08:29:46 -070077
78 brokers := []string{GlobalConfig.Kafka}
onkarkundargi72cfd362020-02-27 12:34:37 +053079 // Create new consumer
80 master, err := sarama.NewConsumer(brokers, config)
81 if err != nil {
82 panic(err)
83 }
84 defer func() {
85 if err := master.Close(); err != nil {
86 panic(err)
87 }
88 }()
89
90 topic := "voltha.events"
91 consumer, err := master.ConsumePartition(topic, 0, sarama.OffsetNewest)
92 if err != nil {
93 panic(err)
94 }
Scott Bakerf04b6392020-03-20 08:29:46 -070095
onkarkundargi72cfd362020-02-27 12:34:37 +053096 // Get signnal for finish
97 doneCh := make(chan struct{})
98 go func() {
Scott Bakerf04b6392020-03-20 08:29:46 -070099 // TODO: Needs a timeout in here
onkarkundargi72cfd362020-02-27 12:34:37 +0530100 for {
101 select {
102 case err := <-consumer.Errors():
103 fmt.Println(err)
104 case msg := <-consumer.Messages():
105 unpackResult := &pb.Event{}
106 var err error
107 if err = proto.Unmarshal(msg.Value, unpackResult); err != nil {
108 fmt.Println("Error while doing unmarshal", err)
109 }
110 kpi_event2 := unpackResult.GetKpiEvent2()
111 if (kpi_event2 != nil) && (kpi_event2.SliceData[0].Metadata.Uuid == id.String()) {
112 result = unpackResult
113 close(doneCh)
114 return
115 }
116 }
117 }
118 }()
119 <-doneCh
120 log.Printf("Result: %s", result)
121 }
122 return result, nil
123}