[VOL:3643] support of some OLT device metrics over the Device Management Interface
1. Following metrices are supported :
   METRIC_FAN_SPEED
   METRIC_CPU_USAGE_PERCENTAGE
   METRIC_RAM_USAGE_PERCENTAGE
   METRIC_DISK_USAGE_PERCENTAGE
   METRIC_INNER_SURROUNDING_TEMP
2. Following DMI APIs are implemented:
   ListMetrics
   UpdateMetricsConfiguration
   GetMetric
   SetMsgBusEndpoint
   GetMsgBusEndpoint
3. Updated docs/source/DMI_Server_README.md

Change-Id: I11f988ff972b8a8682012c7aeea88ba61afb82ba
diff --git a/internal/bbsim/dmiserver/dmi_hw_mgmt.go b/internal/bbsim/dmiserver/dmi_hw_mgmt.go
old mode 100644
new mode 100755
index ba5a0c4..30c27f0
--- a/internal/bbsim/dmiserver/dmi_hw_mgmt.go
+++ b/internal/bbsim/dmiserver/dmi_hw_mgmt.go
@@ -20,6 +20,8 @@
 	"context"
 	"fmt"
 
+	"github.com/Shopify/sarama"
+
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/golang/protobuf/ptypes/timestamp"
 	"github.com/opencord/bbsim/internal/bbsim/devices"
@@ -94,11 +96,44 @@
 		components = append(components, &cage)
 	}
 
-	dms.components = components
+	// create the fans
+	numFans := 2
+	fans := make([]*dmi.Component, numFans)
+
+	for i := 0; i < numFans; i++ {
+		fans[i] = createFanComponent(i + 1)
+	}
+	components = append(components, fans...)
+
+	// Create 1 disk, 1 Processor and 1 ram
+	components = append(components, createDiskComponent(0))
+	components = append(components, createProcessorComponent(0))
+	components = append(components, createMemoryComponent(0))
+	components = append(components, createInnerSurroundingTempComponentSensor(0))
+
+	// create the root component
+	dms.root = &dmi.Component{
+		Name:         dms.deviceName,
+		Class:        0,
+		Description:  "",
+		Parent:       "",
+		ParentRelPos: 0,
+		Children:     components,
+		SerialNum:    dms.deviceSerial,
+		MfgName:      common.Config.Olt.Vendor,
+		IsFru:        false,
+		Uri: &dmi.Uri{
+			Uri: dms.ipAddress,
+		},
+		Uuid: &dmi.Uuid{
+			Uuid: dms.uuid,
+		},
+		State: &dmi.ComponentState{},
+	}
 
 	logger.Debugf("Generated UUID for the uri %s is %s", dms.ipAddress, dms.uuid)
 	response := &dmi.StartManagingDeviceResponse{
-		Status: dmi.Status_OK,
+		Status: dmi.Status_OK_STATUS,
 		DeviceUuid: &dmi.Uuid{
 			Uuid: dms.uuid,
 		},
@@ -113,6 +148,101 @@
 	return nil
 }
 
+func createFanComponent(fanIdx int) *dmi.Component {
+	fanName := fmt.Sprintf("Thermal/Fans/System Fan/%d", fanIdx)
+	fanSerial := fmt.Sprintf("bbsim-fan-serial-%d", fanIdx)
+	return &dmi.Component{
+		Name:         fanName,
+		Class:        dmi.ComponentType_COMPONENT_TYPE_FAN,
+		Description:  "bbsim-fan",
+		Parent:       "",
+		ParentRelPos: 0,
+		SerialNum:    fanSerial,
+		MfgName:      "bbsim-fan",
+		IsFru:        false,
+		Uuid: &dmi.Uuid{
+			Uuid: getUUID(fanName),
+		},
+		State: &dmi.ComponentState{},
+	}
+}
+
+func createProcessorComponent(cpuIdx int) *dmi.Component {
+	cpuName := fmt.Sprintf("Systems/1/Processors/%d", cpuIdx)
+	cpuSerial := fmt.Sprintf("bbsim-cpu-serial-%d", cpuIdx)
+	return &dmi.Component{
+		Name:         cpuName,
+		Class:        dmi.ComponentType_COMPONENT_TYPE_CPU,
+		Description:  "bbsim-cpu",
+		Parent:       "",
+		ParentRelPos: 0,
+		SerialNum:    cpuSerial,
+		MfgName:      "bbsim-cpu",
+		IsFru:        false,
+		Uuid: &dmi.Uuid{
+			Uuid: getUUID(cpuName),
+		},
+		State: &dmi.ComponentState{},
+	}
+}
+
+func createMemoryComponent(memIdx int) *dmi.Component {
+	memName := fmt.Sprintf("Systems/1/Memory/%d", memIdx)
+	memSerial := fmt.Sprintf("bbsim-ram-serial-%d", memIdx)
+	return &dmi.Component{
+		Name:         memName,
+		Class:        dmi.ComponentType_COMPONENT_TYPE_MEMORY,
+		Description:  "bbsim-ram",
+		Parent:       "",
+		ParentRelPos: 0,
+		SerialNum:    memSerial,
+		MfgName:      "bbsim-ram",
+		IsFru:        false,
+		Uuid: &dmi.Uuid{
+			Uuid: getUUID(memName),
+		},
+		State: &dmi.ComponentState{},
+	}
+}
+
+func createDiskComponent(diskIdx int) *dmi.Component {
+	diskName := fmt.Sprintf("Systems/1/Disk/%d", diskIdx)
+	diskSerial := fmt.Sprintf("bbsim-disk-serial-%d", diskIdx)
+	return &dmi.Component{
+		Name:         diskName,
+		Class:        dmi.ComponentType_COMPONENT_TYPE_STORAGE,
+		Description:  "bbsim-disk",
+		Parent:       "",
+		ParentRelPos: 0,
+		SerialNum:    diskSerial,
+		MfgName:      "bbsim-disk",
+		IsFru:        false,
+		Uuid: &dmi.Uuid{
+			Uuid: getUUID(diskName),
+		},
+		State: &dmi.ComponentState{},
+	}
+}
+
+func createInnerSurroundingTempComponentSensor(sensorIdx int) *dmi.Component {
+	sensorName := fmt.Sprintf("Systems/1/Sensor/%d", sensorIdx)
+	sensorSerial := fmt.Sprintf("bbsim-sensor-istemp-serial-%d", sensorIdx)
+	return &dmi.Component{
+		Name:         sensorName,
+		Class:        dmi.ComponentType_COMPONENT_TYPE_SENSOR,
+		Description:  "bbsim-istemp",
+		Parent:       "",
+		ParentRelPos: 0,
+		SerialNum:    sensorSerial,
+		MfgName:      "bbsim-istemp",
+		IsFru:        false,
+		Uuid: &dmi.Uuid{
+			Uuid: getUUID(sensorName),
+		},
+		State: &dmi.ComponentState{},
+	}
+}
+
 //StopManagingDevice stops management of a device and cleans up any context and caches for that device
 func (dms *DmiAPIServer) StopManagingDevice(context.Context, *dmi.StopManagingDeviceRequest) (*dmi.StopManagingDeviceResponse, error) {
 	return nil, status.Errorf(codes.Unimplemented, "rpc StopManagingDevice not implemented")
@@ -138,7 +268,7 @@
 		logger.Errorf("Requested uuid =%s, uuid of existing device = %s", req.DeviceUuid.Uuid, dms.uuid)
 		// Wrong uuid, return error
 		errResponse := &dmi.PhysicalInventoryResponse{
-			Status:    dmi.Status_ERROR,
+			Status:    dmi.Status_ERROR_STATUS,
 			Reason:    dmi.Reason_UNKNOWN_DEVICE,
 			Inventory: &dmi.Hardware{},
 		}
@@ -147,30 +277,13 @@
 	}
 
 	response := &dmi.PhysicalInventoryResponse{
-		Status: dmi.Status_OK,
+		Status: dmi.Status_OK_STATUS,
 		Inventory: &dmi.Hardware{
 			LastChange: &timestamp.Timestamp{
 				Seconds: 0,
 				Nanos:   0,
 			},
-			Root: &dmi.Component{
-				Name:         dms.deviceName,
-				Class:        0,
-				Description:  "",
-				Parent:       "",
-				ParentRelPos: 0,
-				Children:     dms.components,
-				SerialNum:    dms.deviceSerial,
-				MfgName:      common.Config.Olt.Vendor,
-				IsFru:        false,
-				Uri: &dmi.Uri{
-					Uri: dms.ipAddress,
-				},
-				Uuid: &dmi.Uuid{
-					Uuid: dms.uuid,
-				},
-				State: &dmi.ComponentState{},
-			},
+			Root: dms.root,
 		},
 	}
 	return sendResponseBackOnStream(stream, response)
@@ -179,7 +292,6 @@
 //Contains tells whether arr contains element.
 func Contains(arr []string, element string) bool {
 	for _, item := range arr {
-		logger.Debugf("Checking in Contains slice elem = %v str = %s", item, element)
 		if element == item {
 			return true
 		}
@@ -188,29 +300,47 @@
 }
 
 func findComponent(l []*dmi.Component, compUUID string) *dmi.Component {
+	var foundComp *dmi.Component
+
 	for _, comp := range l {
 		logger.Debugf("findComponent slice comp = %v compUUID = %s", comp, compUUID)
 		if comp.Uuid.Uuid == compUUID {
 			return comp
 		}
 
-		for _, child := range comp.GetChildren() {
-			logger.Debugf("findComponent Child slice comp = %v compUUID = %s", comp, compUUID)
-			if child.Uuid.Uuid == compUUID {
-				return child
-			}
+		foundComp = findComponent(comp.GetChildren(), compUUID)
+		if foundComp != nil {
+			return foundComp
 		}
 	}
 
 	return nil
 }
 
+func findComponentsOfType(l []*dmi.Component, compType dmi.ComponentType) []*dmi.Component {
+	var comps []*dmi.Component
+	findComponents(l, compType, &comps)
+	return comps
+}
+
+func findComponents(l []*dmi.Component, compType dmi.ComponentType, collector *[]*dmi.Component) {
+
+	for _, comp := range l {
+		if comp.Class == compType {
+			*collector = append(*collector, comp)
+			//logger.Debugf("Added collector = %v", *collector)
+		}
+
+		findComponents(comp.GetChildren(), compType, collector)
+	}
+}
+
 func sendGetHWComponentResponse(c *dmi.Component, stream dmi.NativeHWManagementService_GetHWComponentInfoServer) error {
-	apiStatus := dmi.Status_OK
+	apiStatus := dmi.Status_OK_STATUS
 	reason := dmi.Reason_UNDEFINED_REASON
 
 	if c == nil {
-		apiStatus = dmi.Status_ERROR
+		apiStatus = dmi.Status_ERROR_STATUS
 		reason = dmi.Reason_UNKNOWN_DEVICE
 	}
 
@@ -240,18 +370,9 @@
 		return status.Errorf(codes.Internal, "stream to send is nil, can not send response from gRPC server")
 	}
 
-	componentFound := Contains(dms.ponTransceiverUuids, req.ComponentUuid.Uuid)
-	if !componentFound {
-		componentFound = Contains(dms.ponTransceiverCageUuids, req.ComponentUuid.Uuid)
-	}
-
-	if req.DeviceUuid.Uuid != dms.uuid || !componentFound {
-		// Wrong uuid, return error
-		return sendGetHWComponentResponse(nil, stream)
-	}
-
 	// Search for the component and return it
-	c := findComponent(dms.components, req.ComponentUuid.Uuid)
+	c := findComponent(dms.root.Children, req.ComponentUuid.Uuid)
+
 	return sendGetHWComponentResponse(c, stream)
 }
 
@@ -266,18 +387,55 @@
 }
 
 //GetLoggingEndpoint gets the configured location to which the logs are being shipped
-func (dms *DmiAPIServer) GetLoggingEndpoint(context.Context, *dmi.Uuid) (*dmi.GetLoggingEndpointResponse, error) {
+func (dms *DmiAPIServer) GetLoggingEndpoint(context.Context, *dmi.HardwareID) (*dmi.GetLoggingEndpointResponse, error) {
 	return nil, status.Errorf(codes.Unimplemented, "rpc GetLoggingEndpoint not implemented")
 }
 
 //SetMsgBusEndpoint sets the location of the Message Bus to which events and metrics are shipped
-func (dms *DmiAPIServer) SetMsgBusEndpoint(context.Context, *dmi.SetMsgBusEndpointRequest) (*dmi.SetRemoteEndpointResponse, error) {
-	return nil, status.Errorf(codes.Unimplemented, "rpc SetMsgBusEndpoint not implemented")
+func (dms *DmiAPIServer) SetMsgBusEndpoint(ctx context.Context, request *dmi.SetMsgBusEndpointRequest) (*dmi.SetRemoteEndpointResponse, error) {
+	logger.Debugf("SetMsgBusEndpoint() invoked with request: %+v and context: %v", request, ctx)
+	if request == nil || request.MsgbusEndpoint == "" {
+		return &dmi.SetRemoteEndpointResponse{Status: dmi.Status_ERROR_STATUS, Reason: dmi.Reason_KAFKA_ENDPOINT_ERROR},
+			status.Errorf(codes.FailedPrecondition, "request is nil")
+	}
+	olt := devices.GetOLT()
+	dms.kafkaEndpoint = request.MsgbusEndpoint
+
+	// close the old publisher
+	if dms.mPublisherCancelFunc != nil {
+		dms.mPublisherCancelFunc()
+	}
+
+	// initialize a new publisher
+	var nCtx context.Context
+	nCtx, dms.mPublisherCancelFunc = context.WithCancel(context.Background())
+	// initialize a publisher
+	if err := InitializeDMKafkaPublishers(sarama.NewAsyncProducer, olt.ID, dms.kafkaEndpoint); err == nil {
+		// start a go routine which will read from channel and publish on kafka
+		go DMKafkaPublisher(nCtx, dms.metricChannel, "dm.metrics")
+	} else {
+		logger.Errorf("Failed to start kafka publisher: %v", err)
+		return &dmi.SetRemoteEndpointResponse{Status: dmi.Status_ERROR_STATUS, Reason: dmi.Reason_KAFKA_ENDPOINT_ERROR}, err
+	}
+
+	return &dmi.SetRemoteEndpointResponse{Status: dmi.Status_OK_STATUS, Reason: dmi.Reason_UNDEFINED_REASON}, nil
 }
 
 //GetMsgBusEndpoint gets the configured location to which the events and metrics are being shipped
 func (dms *DmiAPIServer) GetMsgBusEndpoint(context.Context, *empty.Empty) (*dmi.GetMsgBusEndpointResponse, error) {
-	return nil, status.Errorf(codes.Unimplemented, "rpc GetMsgBusEndpoint not implemented")
+	logger.Debugf("GetMsgBusEndpoint() invoked")
+	if dms.kafkaEndpoint != "" {
+		return &dmi.GetMsgBusEndpointResponse{
+			Status:         dmi.Status_OK_STATUS,
+			Reason:         dmi.Reason_UNDEFINED_REASON,
+			MsgbusEndpoint: dms.kafkaEndpoint,
+		}, nil
+	}
+	return &dmi.GetMsgBusEndpointResponse{
+		Status:         dmi.Status_ERROR_STATUS,
+		Reason:         dmi.Reason_KAFKA_ENDPOINT_ERROR,
+		MsgbusEndpoint: "",
+	}, nil
 }
 
 //GetManagedDevices returns an object containing a list of devices managed by this entity
@@ -297,3 +455,35 @@
 
 	return &retResponse, nil
 }
+
+//GetLogLevel Gets the configured log level for a certain entity on a certain device.
+func (dms *DmiAPIServer) GetLogLevel(context.Context, *dmi.GetLogLevelRequest) (*dmi.GetLogLevelResponse, error) {
+	return &dmi.GetLogLevelResponse{
+		Status: dmi.Status_OK_STATUS,
+		DeviceUuid: &dmi.Uuid{
+			Uuid: dms.uuid,
+		},
+		LogLevels: []*dmi.EntitiesLogLevel{},
+	}, nil
+}
+
+// SetLogLevel Sets the log level of the device, for each given entity to a certain level.
+func (dms *DmiAPIServer) SetLogLevel(context.Context, *dmi.SetLogLevelRequest) (*dmi.SetLogLevelResponse, error) {
+	return &dmi.SetLogLevelResponse{
+		Status: dmi.Status_OK_STATUS,
+		DeviceUuid: &dmi.Uuid{
+			Uuid: dms.uuid,
+		},
+	}, nil
+}
+
+// GetLoggableEntities Gets the entities of a device on which log can be configured.
+func (dms *DmiAPIServer) GetLoggableEntities(context.Context, *dmi.GetLoggableEntitiesRequest) (*dmi.GetLogLevelResponse, error) {
+	return &dmi.GetLogLevelResponse{
+		Status: dmi.Status_OK_STATUS,
+		DeviceUuid: &dmi.Uuid{
+			Uuid: dms.uuid,
+		},
+		LogLevels: []*dmi.EntitiesLogLevel{},
+	}, nil
+}