blob: 178060c60b9b90593377cf35df8fef9dee59919a [file] [log] [blame]
/******************************************************************************
*
* <:copyright-BRCM:2016:DUAL/GPL:standard
*
* Copyright (c) 2016 Broadcom
* All Rights Reserved
*
* Unless you and Broadcom execute a separate written software license
* agreement governing use of this software, this software is licensed
* to you under the terms of the GNU General Public License version 2
* (the "GPL"), available at http://www.broadcom.com/licenses/GPLv2.php,
* with the following added to such license:
*
* As a special exception, the copyright holders of this software give
* you permission to link this software with independent modules, and
* to copy and distribute the resulting executable under terms of your
* choice, provided that you also meet, for each linked independent
* module, the terms and conditions of the license of that module.
* An independent module is a module which is not derived from this
* software. The special exception does not apply to any modifications
* of the software.
*
* Not withstanding the above, under no circumstances may you combine
* this software in any way with any other Broadcom software provided
* under a license other than the GPL, without Broadcom's express prior
* written consent.
*
* :>
*
*****************************************************************************/
/**
* @file bal_api_worker.c
* @brief Main processing loop for the worker thread that handles INDications
* sent from the core to the BAL public API
*
*/
/*
* We need access to the BAL subsystem names
*/
#define BAL_SUBSYSTEM_STR_REQ
#include <bcmos_system.h>
#include <bal_msg.h>
#include <bal_obj_msg_pack_unpack.h>
#include <bal_osmsg.h>
#include <bal_api.h>
#include "bal_api_worker.h"
#ifdef BAL_MONOLITHIC
#include <bal_worker.h>
#endif
#ifdef ENABLE_LOG
#include <bcm_dev_log.h>
#endif
/* This rx thread and worker thread are used to process indications from the core
*/
static bcmos_task api_ind_rx_thread;
static bcmos_task api_ind_worker_thread;
/* Local function declarations */
static int _bal_ipc_api_ind_rx_handler(long data);
static void bal_ipc_api_indication_handler(bcmos_module_id module_id, bcmos_msg *msg);
typedef struct indication_subscription_inst indication_subscription_inst;
struct indication_subscription_inst
{
bcmbal_cb_cfg cb_cfg;
/**< TAILQ link for list management */
TAILQ_ENTRY(indication_subscription_inst) indication_subscription_inst_next;
};
TAILQ_HEAD(indication_subscription_list_head, indication_subscription_inst) indication_subscription_list;
/*
* The queue through which the core converses with
* the backend of the Public API for indications.
*/
static bcmos_msg_queue bal_api_ind_backend_queue;
bcmos_msg_queue *p_bal_api_ind_queue;
/* Create API backend message queue */
bcmos_errno bal_api_ind_msg_queue_create(mgmt_queue_addr_ports *mgmt_queue_info)
{
bcmos_msg_queue_parm msg_q_p = {};
bcmos_errno ret = BCM_ERR_OK;
do
{
/* Create BAL API indication receive queue - the core sends IND messages
* to the BAL public API using this queue.
*/
msg_q_p.name = "api_ind_rx_q";
if (NULL != mgmt_queue_info->balapi_mgmt_ip_port)
{
uint16_t portnum;
char *p_ind_portnum_str;
char balapi_ind_port_str[256];
/*
* make a copy of the user chosen bal api mgmt port
*/
strcpy(balapi_ind_port_str, mgmt_queue_info->balapi_mgmt_ip_port);
/* Find the port number */
p_ind_portnum_str = strchr(balapi_ind_port_str, ':') + 1;
/* convert to an integer and increment it by one */
portnum = atoi(p_ind_portnum_str) + 1;
/* create the new string defining the BAL API indication port */
sprintf(p_ind_portnum_str,"%d", portnum);
/* Set up the BAL API indication IP:port access parameter
*/
msg_q_p.local_ep_address = balapi_ind_port_str;
msg_q_p.remote_ep_address = NULL;
msg_q_p.ep_type = BCMOS_MSG_QUEUE_EP_UDP_SOCKET;
}
else
{
msg_q_p.ep_type = BCMOS_MSG_QUEUE_EP_LOCAL;
}
ret = bcmos_msg_queue_create(&bal_api_ind_backend_queue, &msg_q_p);
if (BCM_ERR_OK != ret)
{
BCM_LOG(ERROR, log_id_public_api, "Couldn't BAL API rx indication queue\n");
break;
}
p_bal_api_ind_queue = &bal_api_ind_backend_queue;
#ifdef BAL_MONOLITHIC
if (NULL == mgmt_queue_info->balapi_mgmt_ip_port)
p_bal_core_to_api_ind_queue = p_bal_api_ind_queue;
#endif
} while (0);
return ret;
}
/* Worker module init function.
* Register for messages this module is expected to receive
*/
static bcmos_errno _bal_worker_module_bal_api_init(long data)
{
bcmos_task_parm task_p = {};
bcmos_errno ret = BCM_ERR_OK;
BUG_ON(0 == data);
do
{
/* Create BAL API indication RX thread */
task_p.name = "ipc_api_ind_rx_thread";
task_p.priority = TASK_PRIORITY_IPC_RX;
task_p.handler = _bal_ipc_api_ind_rx_handler;
task_p.data = (long)&bal_api_ind_backend_queue;
ret = bcmos_task_create(&api_ind_rx_thread, &task_p);
if (ret)
{
BCM_LOG(ERROR, log_id_public_api, "Couldn't create BAL API indication RX thread\n");
break;
}
/* Register the message types to be handled by the mgmt module
*/
bcmos_msg_register(BCMBAL_MGMT_API_IND_MSG,
0,
BCMOS_MODULE_ID_WORKER_API_IND, bal_ipc_api_indication_handler);
}
while(0);
return ret;
}
static int _bal_ipc_api_ind_rx_handler(long data)
{
bcmos_msg_queue *rxq = (bcmos_msg_queue *)data;
bcmos_task *my_task = bcmos_task_current();
bcmos_msg *msg;
bcmos_errno ret = BCM_ERR_OK;
void *payload;
while (!my_task->destroy_request)
{
payload = NULL;
ret = bcmbal_msg_recv(rxq, BCMOS_WAIT_FOREVER, &payload);
if (ret)
{
/* Unexpected failure */
BCM_LOG(ERROR, log_id_public_api, "bcmbal_msg_recv() -> %s\n", bcmos_strerror(ret));
continue;
}
/* Message received */
BCM_LOG(DEBUG, log_id_public_api, "bcmbal_msg_recv(%p) -> %s\n", payload, bcmos_strerror(ret));
/*
* Got a message, so now dispatch it. This will result in one
* of the modules (registered for the message being processed)
* executing its message callback handler.
*/
msg = bcmbal_bcmos_hdr_get(payload);
ret = bcmos_msg_dispatch(msg, BCMOS_MSG_SEND_AUTO_FREE);
if (ret)
{
BCM_LOG(ERROR, log_id_public_api,
"Couldn't dispatch message %d:%d\n",
(int)msg->type, (int)msg->instance);
}
}
my_task->destroyed = BCMOS_TRUE;
return (BCM_ERR_OK == ret) ? 0 : -EINVAL;
}
/* Message wrapper that is called when indication is delivered in application module's context */
static void _int_deliver_wrapper_cb(bcmos_module_id module_id, bcmos_msg *msg)
{
void *msg_payload = bcmbal_payload_ptr_get(bcmbal_bal_hdr_get_by_bcmos_hdr(msg));
f_bcmbal_ind_handler handler = (f_bcmbal_ind_handler)bcmbal_scratchpad_get(msg_payload);
handler((bcmbal_obj *)msg_payload);
bcmbal_msg_free((bcmbal_obj *)msg_payload);
}
static void process_listener_callbacks(bcmbal_obj *obj)
{
indication_subscription_inst *current_entry, *p_temp_entry;
BCM_LOG(DEBUG, log_id_public_api, "inspecting registered callback for object %d\n",
obj->obj_type);
TAILQ_FOREACH_SAFE(current_entry,
&indication_subscription_list,
indication_subscription_inst_next,
p_temp_entry)
{
BCM_LOG(DEBUG, log_id_public_api, "entry objtype %d\n", current_entry->cb_cfg.obj_type);
if((BCMBAL_OBJ_ID_ANY == current_entry->cb_cfg.obj_type) ||
(current_entry->cb_cfg.obj_type == obj->obj_type))
{
BCM_LOG(DEBUG, log_id_public_api,
"Calling registered callback for object %d\n",
obj->obj_type);
/* call the registered function directly or in the target module's context */
if (BCMOS_MODULE_ID_NONE == current_entry->cb_cfg.module)
{
current_entry->cb_cfg.ind_cb_hdlr(obj);
}
else
{
bcmbal_obj *clone = bcmbal_msg_clone(obj);
bcmos_errno err;
if (NULL == clone)
{
BCM_LOG(ERROR, log_id_public_api,
"Couldn't clone message for object %d\n",
obj->obj_type);
continue;
}
bcmbal_scratchpad_set(clone, current_entry->cb_cfg.ind_cb_hdlr);
err = bcmbal_msg_call(clone,
current_entry->cb_cfg.module,
_int_deliver_wrapper_cb,
BCMOS_MSG_SEND_AUTO_FREE);
if (BCM_ERR_OK != err)
{
BCM_LOG(ERROR, log_id_public_api,
"Couldn't deliver message for object %d to module %d. Error %s\n",
obj->obj_type, current_entry->cb_cfg.module, bcmos_strerror(err));
}
}
}
}
return;
}
/*
* This is the handler for indication messages received by the BAL Public API
* backend from the core. We need to see who has subscribed for these messages,
* and call those registered functions.
*/
static void bal_ipc_api_indication_handler(bcmos_module_id module_id, bcmos_msg *msg)
{
void *msg_payload;
msg_payload = bcmbal_payload_ptr_get(bcmbal_bal_hdr_get_by_bcmos_hdr(msg));
/*
* TO-DO
* validate the message major and minor version is correct
*/
do
{
if(BAL_SUBSYSTEM_CORE != bcmbal_sender_get(msg_payload))
{
BCM_LOG(ERROR, log_id_public_api, "Mgmt IND message received from wrong subsystem (%s)\n",
subsystem_str[bcmbal_sender_get(msg_payload)]);
break;
}
if(BCMBAL_MGMT_API_IND_MSG != bcmbal_type_major_get(msg_payload))
{
BCM_LOG(ERROR, log_id_public_api,"Mgmt IND message received with wrong major type (%d)\n",
bcmbal_type_major_get(msg_payload));
break;
}
/* Look through the list of registered subscribers for this indication
* and call them.
*/
BCM_LOG(DEBUG, log_id_public_api,
"Processing indication listeners\n");
process_listener_callbacks((bcmbal_obj *)msg_payload);
}
while(0);
bcmbal_msg_free(msg_payload);
return;
}
void enable_bal_api_indications(const char *balapi_mgmt_ip_port)
{
bcmos_task_parm task_p = {};
bcmos_module_parm module_p = {};
bcmos_errno ret = BCM_ERR_OK;
mgmt_queue_addr_ports mgmt_queue_info;
TAILQ_INIT(&indication_subscription_list);
do
{
/* Create quues for communication between BAL API and the core */
mgmt_queue_info.balapi_mgmt_ip_port = balapi_mgmt_ip_port;
ret = bal_api_ind_msg_queue_create(&mgmt_queue_info);
if (BCM_ERR_OK != ret)
{
BCM_LOG(ERROR, log_id_public_api, "Couldn't create BAL API indication queue\n");
break;
}
module_p.qparm.name = "bal_api_ind_worker_module";
module_p.init = _bal_worker_module_bal_api_init;
module_p.data = (long)&mgmt_queue_info; /* IP address and port information */
/* Create worker thread & modules for BAL indication messages from the core */
task_p.name = "bal_api_ind_worker";
task_p.priority = TASK_PRIORITY_WORKER;
ret = bcmos_task_create(&api_ind_worker_thread, &task_p);
if (BCM_ERR_OK != ret)
{
BCM_LOG(ERROR, log_id_public_api, "Couldn't create BAL API indication worker thread\n");
break;
}
ret = bcmos_module_create(BCMOS_MODULE_ID_WORKER_API_IND, &api_ind_worker_thread, &module_p);
if (ret)
{
BCM_LOG(ERROR, log_id_public_api, "Couldn't create BAL API indication worker module\n");
break;
}
BCM_LOG(DEBUG, log_id_public_api, "BAL API indication handler registered\n");
}
while(0);
return;
}
/** NOTE: when cb_cfg->obj_type is BCMBAL_OBJ_ID_ANY AND type is UN-SUBSCRIBE, then this is unsubscribe all */
bcmos_errno _manage_api_ind_listener(bcmbal_ind_cb_management_type type, bcmbal_cb_cfg *cb_cfg)
{
bcmos_errno ret = BCM_ERR_OK;
indication_subscription_inst *new_ind_entry;
indication_subscription_inst *current_entry, *p_temp_entry;
bcmos_bool is_unsubscribe;
BUG_ON(NULL == cb_cfg);
BUG_ON(NULL == cb_cfg->ind_cb_hdlr);
is_unsubscribe = (IND_CB_UNSUBSCRIBE == type);
BCM_LOG(DEBUG,log_id_public_api,
"%s: %s for BAL API indications\n",
__FUNCTION__,
is_unsubscribe ? "Unsubscribing" : "Subscribing");
do
{
bcmos_bool b_found_existing_entry = BCMOS_FALSE;
TAILQ_FOREACH_SAFE(current_entry,
&indication_subscription_list,
indication_subscription_inst_next,
p_temp_entry)
{
if(
((is_unsubscribe && (BCMBAL_OBJ_ID_ANY == cb_cfg->obj_type)) ? BCMOS_TRUE :
(current_entry->cb_cfg.obj_type == cb_cfg->obj_type)) &&
(current_entry->cb_cfg.ind_cb_hdlr == cb_cfg->ind_cb_hdlr) &&
(current_entry->cb_cfg.module == cb_cfg->module))
{
BCM_LOG(DEBUG,log_id_public_api,
"Found existing registration\n");
if(is_unsubscribe)
{
BCM_LOG(DEBUG,log_id_public_api,
"Removing registration\n");
TAILQ_REMOVE(&indication_subscription_list, current_entry, indication_subscription_inst_next);
bcmos_free(current_entry);
}
b_found_existing_entry = BCMOS_TRUE;
/* Don't stop looking for matches if we are unsubscribing from ANY indications, there could be many
* assigned to a single callback function */
if(!(is_unsubscribe && (BCMBAL_OBJ_ID_ANY == cb_cfg->obj_type))) break;
}
}
/* If we are subscribing to indication and we have already recorded
* this subscription, OR we are un-subscribing (whether or not we
* found a subscription to remove), then return right away with OK
*/
if((BCMOS_TRUE == b_found_existing_entry) || is_unsubscribe)
{
break;
}
BCM_LOG(DEBUG,log_id_public_api,
"Registering NEW subscriber for BAL API indications\n");
/* This is a new subscription */
new_ind_entry = bcmos_calloc(sizeof(indication_subscription_inst));
if (NULL == new_ind_entry)
{
BCM_LOG(FATAL, log_id_public_api,
"Failed to register api indication subscription\n");
ret = BCM_ERR_NOMEM;
break;
}
new_ind_entry->cb_cfg = *cb_cfg;
TAILQ_INSERT_TAIL(&indication_subscription_list, new_ind_entry, indication_subscription_inst_next);
} while(0);
return ret;
}
static void free_all_indication_subscriptions(void)
{
indication_subscription_inst *current_entry, *p_temp_entry;
TAILQ_FOREACH_SAFE(current_entry,
&indication_subscription_list,
indication_subscription_inst_next,
p_temp_entry)
{
TAILQ_REMOVE(&indication_subscription_list, current_entry, indication_subscription_inst_next);
bcmos_free(current_entry);
}
return;
}
void bal_api_indications_finish(void)
{
free_all_indication_subscriptions();
bcmos_msg_unregister(BCMBAL_MGMT_API_IND_MSG,
0,
BCMOS_MODULE_ID_WORKER_API_IND);
bcmos_msg_queue_destroy(&bal_api_ind_backend_queue);
bcmos_module_destroy(BCMOS_MODULE_ID_WORKER_API_IND);
bcmos_task_destroy(&api_ind_rx_thread);
bcmos_task_destroy(&api_ind_worker_thread);
return;
}