blob: 31e0f552634fb9a02946d260aec3895329bf54b9 [file] [log] [blame]
# Copyright 2020-present Open Networking Foundation
# Original copyright 2020-present ADTRAN, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from concurrent import futures
import grpc
import dmi.hw_events_mgmt_service_pb2
import dmi.hw_events_mgmt_service_pb2_grpc
import dmi.hw_management_service_pb2
import dmi.hw_management_service_pb2_grpc
import dmi.hw_metrics_mgmt_service_pb2
import dmi.hw_metrics_mgmt_service_pb2_grpc
import dmi.sw_management_service_pb2
import dmi.sw_management_service_pb2_grpc
import dmi.commons_pb2
import dmi.sw_image_pb2
class DmiEventsManagementServiceServicer(dmi.hw_events_mgmt_service_pb2_grpc.NativeEventsManagementServiceServicer):
def ListEvents(self, request, context):
return dmi.hw_events_mgmt_service_pb2.ListEventsResponse(status=dmi.commons_pb2.OK_STATUS)
def UpdateEventsConfiguration(self, request, context):
return dmi.hw_events_mgmt_service_pb2.EventsConfigurationResponse(status=dmi.commons_pb2.OK_STATUS)
def StreamEvents(self, request, context):
for _ in range(0, 1):
yield dmi.hw_events_mgmt_service_pb2.Event(event_id=dmi.hw_events_mgmt_service_pb2.EVENT_NAME_UNDEFINED)
class DmiHwManagementServiceServicer(dmi.hw_management_service_pb2_grpc.NativeHWManagementServiceServicer):
def StartManagingDevice(self, request, context):
for _ in range(0, 1):
yield dmi.hw_management_service_pb2.StartManagingDeviceResponse(status=dmi.commons_pb2.OK_STATUS)
def StopManagingDevice(self, request, context):
return dmi.hw_management_service_pb2.StopManagingDeviceResponse(status=dmi.commons_pb2.OK_STATUS)
def GetManagedDevices(self, request, context):
return dmi.hw_management_service_pb2.ManagedDevicesResponse()
def GetPhysicalInventory(self, request, context):
for _ in range(0, 1):
yield dmi.hw_management_service_pb2.PhysicalInventoryResponse(status=dmi.commons_pb2.OK_STATUS)
def GetHWComponentInfo(self, request, context):
for _ in range(0, 1):
yield dmi.hw_management_service_pb2.HWComponentInfoGetResponse(status=dmi.commons_pb2.OK_STATUS)
def SetHWComponentInfo(self, request, context):
return dmi.hw_management_service_pb2.HWComponentInfoSetResponse(status=dmi.commons_pb2.OK_STATUS)
def SetLoggingEndpoint(self, request, context):
return dmi.hw_management_service_pb2.SetRemoteEndpointResponse(status=dmi.commons_pb2.OK_STATUS)
def GetLoggingEndpoint(self, request, context):
return dmi.hw_management_service_pb2.GetLoggingEndpointResponse(status=dmi.commons_pb2.OK_STATUS)
def SetMsgBusEndpoint(self, request, context):
return dmi.hw_management_service_pb2.SetRemoteEndpointResponse(status=dmi.commons_pb2.OK_STATUS)
def GetMsgBusEndpoint(self, request, context):
return dmi.hw_management_service_pb2.GetMsgBusEndpointResponse(status=dmi.commons_pb2.OK_STATUS)
def GetLoggableEntities(self, request, context):
return dmi.hw_management_service_pb2.GetLogLevelResponse(status=dmi.commons_pb2.OK_STATUS)
def SetLogLevel(self, request, context):
return dmi.hw_management_service_pb2.SetLogLevelResponse()
def GetLogLevel(self, request, context):
return dmi.hw_management_service_pb2.GetLogLevelResponse(status=dmi.commons_pb2.OK_STATUS)
def HeartbeatCheck(self, request, context):
return dmi.hw_management_service_pb2.Heartbeat(heartbeat_signature=0)
def RebootDevice(self, request, context):
return dmi.hw_management_service_pb2.RebootDeviceResponse(status=dmi.commons_pb2.OK_STATUS)
class DmiMetricsManagementServiceServicer(dmi.hw_metrics_mgmt_service_pb2_grpc.NativeMetricsManagementServiceServicer):
def ListMetrics(self, request, context):
return dmi.hw_metrics_mgmt_service_pb2.ListMetricsResponse(status=dmi.commons_pb2.OK_STATUS)
def UpdateMetricsConfiguration(self, request, context):
return dmi.hw_metrics_mgmt_service_pb2.MetricsConfigurationResponse(status=dmi.commons_pb2.OK_STATUS)
def GetMetric(self, request, context):
return dmi.hw_metrics_mgmt_service_pb2.GetMetricResponse(status=dmi.commons_pb2.OK_STATUS)
def StreamMetrics(self, request, context):
for _ in range(0, 1):
yield dmi.hw_metrics_mgmt_service_pb2.Metric(metric_id=dmi.hw_metrics_mgmt_service_pb2.METRIC_NAME_UNDEFINED)
class DmiSoftwareManagementServiceServicer(dmi.sw_management_service_pb2_grpc.NativeSoftwareManagementServiceServicer):
def GetSoftwareVersion(self, request, context):
return dmi.sw_management_service_pb2.GetSoftwareVersionInformationResponse(status=dmi.commons_pb2.OK_STATUS)
def DownloadImage(self, request, context):
for _ in range(0, 1):
yield dmi.sw_image_pb2.ImageStatus(status=dmi.commons_pb2.OK_STATUS)
def ActivateImage(self, request, context):
for _ in range(0, 1):
yield dmi.sw_image_pb2.ImageStatus(status=dmi.commons_pb2.OK_STATUS)
def RevertToStandbyImage(self, request, context):
for _ in range(0, 1):
yield dmi.sw_image_pb2.ImageStatus(status=dmi.commons_pb2.OK_STATUS)
def UpdateStartupConfiguration(self, request, context):
for _ in range(0, 1):
yield dmi.sw_management_service_pb2.ConfigResponse(status=dmi.commons_pb2.OK_STATUS)
def GetStartupConfigurationInfo(self, request, context):
return dmi.sw_management_service_pb2.StartupConfigInfoResponse(status=dmi.commons_pb2.OK_STATUS)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
dmi.hw_events_mgmt_service_pb2_grpc.add_NativeEventsManagementServiceServicer_to_server(DmiEventsManagementServiceServicer(), server)
dmi.hw_management_service_pb2_grpc.add_NativeHWManagementServiceServicer_to_server(DmiHwManagementServiceServicer(), server)
dmi.hw_metrics_mgmt_service_pb2_grpc.add_NativeMetricsManagementServiceServicer_to_server(DmiMetricsManagementServiceServicer(), server)
dmi.sw_management_service_pb2_grpc.add_NativeSoftwareManagementServiceServicer_to_server(DmiSoftwareManagementServiceServicer(), server)
server.add_insecure_port('127.0.01:50051')
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()