VOL-4788 : Lib in in voltha-libs-go for support of prometheus counters in voltha

Change-Id: I1a4a81f775595b89dbc2a5e4411e84034e30e1af
diff --git a/pkg/stats/README.md b/pkg/stats/README.md
new file mode 100644
index 0000000..47f94ba
--- /dev/null
+++ b/pkg/stats/README.md
@@ -0,0 +1,53 @@
+# Helm Chart configuration
+
+## Prometheus based stats server
+
+Below is the configuration for running exposing the statistics on port 8081.
+A running prometheus on kubernetes will grab the statistics from this service looking at the annotations.
+
+### deployment.yaml
+
+``` yaml
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: test
+spec:
+  template:
+    metadata:
+      {{- with .Values.deployment.annotations }}
+      annotations: {{- toYaml . | nindent 8 }}
+      {{- end }}
+    spec:
+      containers:
+        - name: testContainer
+          ports:
+          - name: promserver
+            containerPort: 8081
+```
+
+### values.yaml
+
+``` yaml
+deployment:
+  annotations:
+    prometheus.io/path: "/metrics"
+    prometheus.io/port: "8081"
+    prometheus.io/scrape: "true"
+```
+
+### service.yaml
+
+``` yaml
+apiVersion: v1
+kind: Service
+metadata:
+  name: test
+spec:
+  type: ClusterIP
+  ports:
+    - port: 8081
+      targetPort: promserver
+      protocol: TCP
+      name: promserver
+```
diff --git a/pkg/stats/common.go b/pkg/stats/common.go
new file mode 100644
index 0000000..781c7b9
--- /dev/null
+++ b/pkg/stats/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package stats
+
+import (
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/pkg/stats/manager.go b/pkg/stats/manager.go
new file mode 100644
index 0000000..fdc8cc0
--- /dev/null
+++ b/pkg/stats/manager.go
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package stats
+
+import (
+	"context"
+	"time"
+)
+
+type CollectorName string
+
+const (
+	OltAdapter CollectorName = "adapter_olt"
+	OnuAdapter CollectorName = "adapter_onu"
+	VCore      CollectorName = "rw_core"
+)
+
+func (s CollectorName) String() string {
+	switch s {
+	case OltAdapter:
+		return "adapter_olt"
+	case OnuAdapter:
+		return "adapter_onu"
+	case VCore:
+		return "rw_core"
+	}
+	return "unknown"
+}
+
+type StatsManager interface {
+	// Start starts the statistics manager with name and makes the collected stats available at port p.
+	Start(ctx context.Context, p int, name CollectorName)
+
+	//CountForDevice counts the number of times the counterName happens for device devId with serial number sn. Each call to Count increments it by one.
+	CountForDevice(devId, sn string, counterName DeviceCounter)
+
+	//AddForDevice adds val to counter.
+	AddForDevice(devId, sn string, counter DeviceCounter, val float64)
+
+	//CollectDurationForDevice calculates the duration from startTime to time.Now() for device devID with serial number sn.
+	CollectDurationForDevice(devID, sn string, dName NonDeviceDuration, startTime time.Time)
+
+	//Count counts the number of times the counterName happens. Each call to Count increments it by one.
+	Count(counter NonDeviceCounter)
+
+	//Add adds val to counter.
+	Add(counter NonDeviceCounter, val float64)
+
+	//CollectDuration calculates the duration from startTime to time.Now().
+	CollectDuration(dName NonDeviceDuration, startTime time.Time)
+}
+
+type NullStatsServer struct {
+}
+
+func (n *NullStatsServer) Start(ctx context.Context, p int, name CollectorName)              {}
+func (n *NullStatsServer) CountForDevice(devId, sn string, counterName DeviceCounter)        {}
+func (n *NullStatsServer) AddForDevice(devId, sn string, counter DeviceCounter, val float64) {}
+func (n *NullStatsServer) CollectDurationForDevice(devID, sn string, dName NonDeviceDuration, startTime time.Time) {
+}
+func (n *NullStatsServer) Count(counter NonDeviceCounter)                               {}
+func (n *NullStatsServer) Add(counter NonDeviceCounter, val float64)                    {}
+func (n *NullStatsServer) CollectDuration(dName NonDeviceDuration, startTime time.Time) {}
diff --git a/pkg/stats/promserver.go b/pkg/stats/promserver.go
new file mode 100644
index 0000000..1fbfd38
--- /dev/null
+++ b/pkg/stats/promserver.go
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package stats
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+	"time"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+const (
+	cPrefix = "voltha"
+)
+
+type PromStatsServer struct {
+	// To hold the counters which are specific to devices
+	devCounters *prometheus.CounterVec
+	// To hold the counters which are NOT tied to specific devices
+	otherCounters *prometheus.CounterVec
+	// To hold the durations which are specific to devices
+	devDurations *prometheus.HistogramVec
+	// To hold the durations which are NOT tied to specific to devices
+	otherDurations *prometheus.HistogramVec
+}
+
+var StatsServer = PromStatsServer{}
+
+// Start starts the statistics manager with name and makes the collected stats available
+// at port p. All the statistics collected by this collector will be appended with "voltha_(name)_"
+// when they appear in Prometheus. The function starts a prometheus HTTP listener in the background and does not return
+// any errors, the listener is stopped on context cancellation
+func (ps *PromStatsServer) Start(ctx context.Context, p int, name CollectorName) {
+	//log.SetLogger(logging.New())
+	ps.initializeCollectors(ctx, name)
+
+	logger.Infow(ctx, "Starting Statistics HTTP Server", log.Fields{"listeningPort": p})
+
+	http.Handle("/metrics", promhttp.Handler())
+	server := &http.Server{Addr: fmt.Sprintf(":%d", p), Handler: nil}
+
+	go func() {
+		<-ctx.Done()
+		logger.Infof(ctx, "Shutting down the Statistics HTTP server")
+		err := server.Shutdown(ctx)
+		if err != nil {
+			logger.Errorw(ctx, "Statistics server shutting down failure", log.Fields{"error": err})
+		}
+	}()
+
+	go func() {
+		err := server.ListenAndServe()
+		if err != nil && err != http.ErrServerClosed {
+			logger.Errorw(ctx, "Starting Statistics HTTP server error", log.Fields{"error": err})
+		}
+	}()
+
+	logger.Infow(ctx, "Started Prometheus listener for statistics on port", log.Fields{"port": p})
+}
+
+func (ps *PromStatsServer) initializeCollectors(ctx context.Context, name CollectorName) {
+	logger.Infof(ctx, "Initializing statistics collector")
+
+	collectorName := cPrefix + "_" + name.String()
+
+	var (
+		// in milliseconds
+		defBuckets = []float64{2, 5, 10, 25, 50, 100, 300, 1000, 5000}
+	)
+
+	ps.devCounters = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: collectorName,
+			Name:      "device_counters",
+			Help:      "Device specific counters",
+		},
+		[]string{"device_id", "serial_no", "counter"},
+	)
+
+	ps.otherCounters = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: collectorName,
+			Name:      "counters",
+			Help:      "Non device counters",
+		},
+		[]string{"counter"},
+	)
+
+	ps.devDurations = prometheus.NewHistogramVec(
+		prometheus.HistogramOpts{
+			Namespace: collectorName,
+			Name:      "device_durations",
+			Help:      "Time taken in ms to complete a specific task for a specific device",
+			Buckets:   defBuckets,
+		},
+		[]string{"device_id", "serial_no", "duration"},
+	)
+
+	ps.otherDurations = prometheus.NewHistogramVec(
+		prometheus.HistogramOpts{
+			Namespace: collectorName,
+			Name:      "durations",
+			Help:      "Time taken in ms to complete a specific task not tied to a device",
+			Buckets:   defBuckets,
+		},
+		[]string{"duration"},
+	)
+
+	prometheus.MustRegister(ps.devCounters)
+	prometheus.MustRegister(ps.otherCounters)
+	prometheus.MustRegister(ps.devDurations)
+	prometheus.MustRegister(ps.otherDurations)
+}
+
+//CountForDevice counts the number of times the counterName happens for device devId with serial number sn. Each call to Count increments it by one.
+func (ps *PromStatsServer) CountForDevice(devId, sn string, counterName DeviceCounter) {
+	if ps.devCounters != nil {
+		ps.devCounters.WithLabelValues(devId, sn, counterName.String()).Inc()
+	}
+}
+
+//AddForDevice adds val to counter.
+func (ps *PromStatsServer) AddForDevice(devId, sn string, counter DeviceCounter, val float64) {
+	if ps.devCounters != nil {
+		ps.devCounters.WithLabelValues(devId, sn, counter.String()).Add(val)
+	}
+}
+
+//CollectDurationForDevice calculates the duration from startTime to time.Now() for device devID with serial number sn.
+func (ps *PromStatsServer) CollectDurationForDevice(devID, sn string, dName DeviceDuration, startTime time.Time) {
+	if ps.otherDurations != nil {
+		timeSpent := time.Since(startTime)
+		ps.devDurations.WithLabelValues(devID, sn, dName.String()).Observe(float64(timeSpent.Milliseconds()))
+	}
+}
+
+//Count counts the number of times the counterName happens. Each call to Count increments it by one.
+func (ps *PromStatsServer) Count(counter NonDeviceCounter) {
+	if ps.otherCounters != nil {
+		ps.otherCounters.WithLabelValues(counter.String()).Inc()
+	}
+}
+
+//Add adds val to counter.
+func (ps *PromStatsServer) Add(counter NonDeviceCounter, val float64) {
+	if ps.otherCounters != nil {
+		ps.otherCounters.WithLabelValues(counter.String()).Add(val)
+	}
+}
+
+//CollectDuration calculates the duration from startTime to time.Now().
+func (ps *PromStatsServer) CollectDuration(dName NonDeviceDuration, startTime time.Time) {
+	if ps.otherDurations != nil {
+		timeSpent := time.Since(startTime)
+		ps.otherDurations.WithLabelValues(dName.String()).Observe(float64(timeSpent.Milliseconds()))
+	}
+}
diff --git a/pkg/stats/promserver_test.go b/pkg/stats/promserver_test.go
new file mode 100644
index 0000000..5c5da7d
--- /dev/null
+++ b/pkg/stats/promserver_test.go
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package stats
+
+import (
+	"context"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+)
+
+// TODO: Check how to reset the prom counters and histogram
+func TestPromStatsServer_Start(t *testing.T) {
+	serverCtx, serverCancel := context.WithCancel(context.Background())
+	defer serverCancel()
+
+	testPort := 34201
+
+	StatsServer.Start(serverCtx, testPort, VCore)
+
+	//give time to the prom server to start
+	time.Sleep(time.Millisecond * 300)
+
+	StatsServer.Count(NumErrorsWritingToBus)
+	StatsServer.Count(NumErrorsWritingToBus)
+
+	StatsServer.CountForDevice("dev1", "serial1", NumOnusActivated)
+	StatsServer.CountForDevice("dev1", "serial1", NumOnusActivated)
+	StatsServer.CountForDevice("dev1", "serial1", NumOnusActivated)
+
+	StatsServer.Add(NumCoreRpcErrors, 4.0)
+
+	StatsServer.AddForDevice("dev2", "serial2", NumDiscoveriesReceived, 56)
+
+	startTime := time.Now()
+
+	time.Sleep(100 * time.Millisecond)
+	StatsServer.CollectDurationForDevice("dev3", "sn3", OnuDiscoveryProcTime, startTime)
+
+	StatsServer.CollectDuration(DBWriteTime, startTime)
+
+	clientCtx, clientCancel := context.WithTimeout(context.Background(), time.Second)
+	defer clientCancel()
+
+	req, err := http.NewRequest("GET", fmt.Sprintf("http://127.0.0.1:%d/metrics", testPort), nil)
+	require.NoError(t, err)
+	req = req.WithContext(clientCtx)
+
+	client := http.DefaultClient
+	res, err := client.Do(req)
+	require.NoError(t, err)
+	defer res.Body.Close()
+
+	assert.Equal(t, 200, res.StatusCode)
+
+	bodyBytes, err := ioutil.ReadAll(res.Body)
+	require.NoError(t, err)
+
+	assert.Contains(t, string(bodyBytes), `voltha_rw_core_counters{counter="bus_write_errors_total"} 2`)
+	assert.Contains(t, string(bodyBytes), `voltha_rw_core_device_counters{counter="activated_onus_total",device_id="dev1",serial_no="serial1"} 3`)
+	assert.Contains(t, string(bodyBytes), `voltha_rw_core_counters{counter="core_rpc_errors_total"} 4`)
+	assert.Contains(t, string(bodyBytes), `voltha_rw_core_device_counters{counter="discoveries_received_total",device_id="dev2",serial_no="serial2"} 56`)
+	assert.Contains(t, string(bodyBytes), `voltha_rw_core_device_durations_bucket{device_id="dev3",duration="onu_discovery_proc_time",serial_no="sn3",le="300"} 1`)
+}
diff --git a/pkg/stats/stats.go b/pkg/stats/stats.go
new file mode 100644
index 0000000..c507e55
--- /dev/null
+++ b/pkg/stats/stats.go
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package stats
+
+type DeviceCounter string
+type NonDeviceCounter string
+
+type NonDeviceDuration string
+type DeviceDuration string
+
+// The unit for all the time based metrics are in milli seconds
+
+const (
+	// OLT Device stats
+	//-----------------
+	// Number of ONU Discovery messages received by the OLT adapter
+	NumDiscoveriesReceived DeviceCounter = "discoveries_received_total"
+	// Number of ONUs successfully activated by the OLT adapter
+	NumOnusActivated DeviceCounter = "activated_onus_total"
+
+	// ONT Device stats
+	//-----------------
+	// Number of times transmission retries for OMCI messages were done for a specific ONU
+	OmciCCTxRetries DeviceCounter = "omci_cc_tx_retries_total"
+	// Number of times transmission timeouts for OMCI messages happened for a specific ONU
+	OmciCCTxTimeouts DeviceCounter = "omci_cc_tx_timeous_total"
+
+	// Other not device specific stats
+	//--------------------------------
+	// Number of times writing to the message bus failed, could be collected by adapters as well as vCore
+	NumErrorsWritingToBus NonDeviceCounter = "bus_write_errors_total"
+	// Number of times rpc calls to the vCore resulted in errors at the adapters
+	NumCoreRpcErrors NonDeviceCounter = "core_rpc_errors_total"
+	// Number of times rpc calls to the adapters resulted in errors at the vCore
+	NumAdapterRpcErrors NonDeviceCounter = "adapter_rpc_errors_total"
+
+	// OLT Device durations
+	//---------------------
+	// Time taken at the OLT adapter to process successfully an ONU Discovery message received
+	OnuDiscoveryProcTime DeviceDuration = "onu_discovery_proc_time"
+	// Time taken at the OLT adapter to successfully activate an ONU
+	OnuDiscToActivateTime DeviceDuration = "onu_discovery_to_activate_time"
+	// Time taken at the OLT adapter from the time the ONU Discovery was received to the first flow being pushed for the ONU
+	OnuDiscToFlowsPushedTime DeviceDuration = "onu_disc_to_flows_pushed_time"
+
+	// ONU Device durations
+	//---------------------
+
+	// Other not device specific durations
+	//------------------------------------
+	// Time taken to write an entry to the database, could be collected by all the three users of the database
+	DBWriteTime NonDeviceDuration = "db_write_time"
+	// Time taken to read an entry from the database, could be collected by all the three users of the database
+	DBReadTime NonDeviceDuration = "db_read_time"
+)
+
+func (s DeviceCounter) String() string {
+	switch s {
+	case NumDiscoveriesReceived:
+		return "discoveries_received_total"
+	case NumOnusActivated:
+		return "activated_onus_total"
+	case OmciCCTxRetries:
+		return "omci_cc_tx_retries_total"
+	case OmciCCTxTimeouts:
+		return "omci_cc_tx_timeous_total"
+	}
+	return "unknown"
+}
+
+func (s NonDeviceCounter) String() string {
+	switch s {
+	case NumErrorsWritingToBus:
+		return "bus_write_errors_total"
+	case NumCoreRpcErrors:
+		return "core_rpc_errors_total"
+	case NumAdapterRpcErrors:
+		return "adapter_rpc_errors_total"
+	}
+	return "unknown"
+}
+
+func (s NonDeviceDuration) String() string {
+	switch s {
+	case DBWriteTime:
+		return "db_write_time"
+	case DBReadTime:
+		return "db_read_time"
+	}
+	return "unknown"
+}
+
+func (s DeviceDuration) String() string {
+	switch s {
+	case OnuDiscoveryProcTime:
+		return "onu_discovery_proc_time"
+	case OnuDiscToActivateTime:
+		return "onu_discovery_to_activate_time"
+	case OnuDiscToFlowsPushedTime:
+		return "onu_disc_to_flows_pushed_time"
+	}
+	return "unknown"
+}