blob: 489a4334437b82530a993c0a656dedbd2080c258 [file] [log] [blame]
anjana_sreekumar@infosys.com991c2062020-01-08 11:42:57 +05301/*
2 * Copyright (c) 2003-2018, Great Software Laboratory Pvt. Ltd.
3 * Copyright (c) 2017 Intel Corporation
4 * Copyright (c) 2019, Infosys Ltd.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include <stdio.h>
20#include <stdlib.h>
21#include <arpa/inet.h>
22#include <unistd.h>
23#include <string.h>
24#include <pthread.h>
25
26#include "thread_pool.h"
27#include "err_codes.h"
28#include "options.h"
29#include "ipc_api.h"
30#include "message_queues.h"
31#include "s11.h"
32#include "s11_config.h"
33#include <sys/types.h>
34#include "msgType.h"
35#include "../gtpV2Codec/gtpV2StackWrappers.h"
36/**Global and externs **/
37extern s11_config g_s11_cfg;
38
39/*S11 CP communication parameters*/
40int g_s11_fd;
41struct sockaddr_in g_s11_cp_addr;
42socklen_t g_s11_serv_size;
43struct sockaddr_in g_client_addr;
44socklen_t g_client_addr_size;
45int ipc_reader_tipc_s11;
46
47/*Connections to send response(CS/MB) to mme-app*/
48int g_resp_fd;
49
50pthread_t tipcReaderS11_t;
51
52pthread_mutex_t s11_net_lock = PTHREAD_MUTEX_INITIALIZER;
53
54struct thread_pool *g_tpool;
55struct thread_pool *g_tpool_tipc_reader_s11;
56/**End: global and externs**/
57
58extern char processName[255];
59extern int pid;
60
61
62void
63handle_mmeapp_message_s11(void * data)
64{
65 char *msg = ((char *) data) + (sizeof(uint32_t)*2);
66
67 msg_type_t* msg_type = (msg_type_t*)(msg);
68
69 switch(*msg_type)
70 {
71 case create_session_request:
72 create_session_handler(msg);
73 break;
74 case modify_bearer_request:
75 modify_bearer_handler(msg);
76 break;
77 case delete_session_request:
78 delete_session_handler(msg);
79 break;
80 case release_bearer_request:
81 release_bearer_handler(msg);
82 break;
83 case ddn_acknowledgement:
84 ddn_ack_handler(msg);
85 break;
86 default:
87 break;
88 }
89 free(data);
90}
91
92void * tipc_msg_handler_s11()
93{
94 int bytesRead = 0;
95 while (1)
96 {
97 unsigned char buffer[255] = {0};
98 if ((bytesRead = read_tipc_msg(ipc_reader_tipc_s11, buffer, 255)) > 0)
99 {
100 unsigned char *tmpBuf = (unsigned char *) malloc(sizeof(char) * bytesRead);
101 memcpy(tmpBuf, buffer, bytesRead);
102 log_msg(LOG_INFO, "S11 message received from mme-app");
103 insert_job(g_tpool_tipc_reader_s11, handle_mmeapp_message_s11, tmpBuf);
104 }
105 }
106}
107struct GtpV2Stack* gtpStack_gp = NULL;
108extern struct MsgBuffer* csReqMsgBuf_p;
109extern struct MsgBuffer* mbReqMsgBuf_p;
110extern struct MsgBuffer* dsReqMsgBuf_p;
111extern struct MsgBuffer* rbReqMsgBuf_p;
112extern struct MsgBuffer* ddnAckMsgBuf_p;
113
114int
115init_s11_workers()
116{
117 if ((ipc_reader_tipc_s11 = create_tipc_socket()) <= 0)
118 {
119 log_msg(LOG_ERROR, "Failed to create IPC Reader tipc socket \n");
120 return -E_FAIL;
121 }
122 if ( bind_tipc_socket(ipc_reader_tipc_s11, s11AppInstanceNum_c) != 1)
123 {
124 log_msg(LOG_ERROR, "Failed to bind IPC Reader tipc socket \n");
125 return -E_FAIL;
126 }
127
128 /* Initialize thread pool for mme-app messages */
129 g_tpool_tipc_reader_s11 = thread_pool_new(3);
130
131 if (g_tpool_tipc_reader_s11 == NULL) {
132 log_msg(LOG_ERROR, "Error in creating thread pool. \n");
133 return -E_FAIL_INIT;
134 }
135
136 log_msg(LOG_INFO, "S11 Listener thead pool initalized.\n");
137
138 // thread to read incoming ipc messages from tipc socket
139 pthread_attr_t attr;
140 pthread_attr_init(&attr);
141 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
142 pthread_create(&tipcReaderS11_t, &attr, &tipc_msg_handler_s11, NULL);
143 pthread_attr_destroy(&attr);
144
145 return 0;
146}
147
148/*Initialize sctp socket connection for eNB*/
149int
150init_gtpv2()
151{
152 /*Create UDP socket*/
153 g_s11_fd = socket(PF_INET, SOCK_DGRAM, 0);
154
155 g_client_addr.sin_family = AF_INET;
156 //g_client_addr.sin_addr.s_addr = htonl(g_s11_cfg.local_egtp_ip);
157 g_client_addr.sin_addr.s_addr = htonl(g_s11_cfg.local_egtp_ip);
158 fprintf(stderr, "....................local egtp %d\n", g_s11_cfg.local_egtp_ip);
159 //g_client_addr.sin_port = htons(0); /* TODO: Read value from config */
160 g_client_addr.sin_port = htons(g_s11_cfg.egtp_def_port);
161
162 bind(g_s11_fd, (struct sockaddr *)&g_client_addr, sizeof(g_client_addr));
163 g_client_addr_size = sizeof(g_client_addr);
164
165 /*Configure settings in address struct*/
166 g_s11_cp_addr.sin_family = AF_INET;
167 //g_s11_cp_addr.sin_port = htons(g_s11_cfg.egtp_def_port);
168 fprintf(stderr, ".................... egtp def port %d\n", g_s11_cfg.egtp_def_port);
169 g_s11_cp_addr.sin_port = htons(g_s11_cfg.egtp_def_port);
170 //g_s11_cp_addr.sin_addr.s_addr = htonl(g_s11_cfg.sgw_ip);
171 fprintf(stderr, "....................sgw ip %d\n", g_s11_cfg.sgw_ip);
172 g_s11_cp_addr.sin_addr.s_addr = htonl(g_s11_cfg.sgw_ip);
173 memset(g_s11_cp_addr.sin_zero, '\0', sizeof(g_s11_cp_addr.sin_zero));
174
175 g_s11_serv_size = sizeof(g_s11_cp_addr);
176
177 return SUCCESS;
178}
179
180/**
181 Opening pipe connection from S11 app to MME(Single queue pipe)
182*/
183int
184init_s11_ipc()
185{
186 log_msg(LOG_INFO, "Connecting to mme-app S11 CS response queue\n");
187 if ((g_resp_fd = create_tipc_socket()) <= 0)
188 return -E_FAIL;
189
190 log_msg(LOG_INFO, "S11 - mme-app IPC: Connected.\n");
191
192 return 0;
193}
194
195/**
196 Read incoming S11 messages and pass to threadpool
197 for processing.
198*/
199void
200s11_reader()
201{
202 unsigned char buffer[S11_GTPV2C_BUF_LEN];
203 int len;
204
205 while(1) {
206 //len = recvfrom(g_s11_fd, buffer, S11_GTPV2C_BUF_LEN, 0,
207 // &g_client_addr, &g_client_addr_size);
208 len = recvfrom(g_s11_fd, buffer, S11_GTPV2C_BUF_LEN, 0,
209 (struct sockaddr*)&g_s11_cp_addr, &g_s11_serv_size);
210
211 if(len > 0) {
212 MsgBuffer* tmp_buf_p = createMsgBuffer(len);
213 MsgBuffer_writeBytes(tmp_buf_p, buffer, len, true);
214 MsgBuffer_rewind(tmp_buf_p);
215 log_msg(LOG_INFO, "S11 Received msg len : %d \n",len);
216 insert_job(g_tpool, handle_s11_message, tmp_buf_p);
217 }
218
219 }
220}
221
222int
223main(int argc, char **argv)
224{
225 memcpy (processName, argv[0], strlen(argv[0]));
226 pid = getpid();
227
228 init_parser("conf/s11.json");
229 parse_s11_conf();
230
231 // init stack
232 gtpStack_gp = createGtpV2Stack();
233 if (gtpStack_gp == NULL)
234 {
235 log_msg(LOG_ERROR, "Error in initializing ipc.\n");
236 return -1;
237 }
238
239 csReqMsgBuf_p = createMsgBuffer(4096);
240 mbReqMsgBuf_p = createMsgBuffer(4096);
241 dsReqMsgBuf_p = createMsgBuffer(4096);
242 rbReqMsgBuf_p = createMsgBuffer(4096);
243 ddnAckMsgBuf_p = createMsgBuffer(4096);
244
245 if (csReqMsgBuf_p == NULL || mbReqMsgBuf_p == NULL || dsReqMsgBuf_p == NULL || rbReqMsgBuf_p == NULL || ddnAckMsgBuf_p == NULL)
246 {
247 log_msg(LOG_ERROR, "Error in initializing msg buffers required by gtp codec.\n");
248 return -1;
249 }
250
251 /*Init writer sockets*/
252 if (init_s11_ipc() != 0) {
253 log_msg(LOG_ERROR, "Error in initializing ipc.\n");
254 return -1;
255 }
256
257 init_s11_workers();
258
259 /* Initialize thread pool for S11 messages from CP*/
260 g_tpool = thread_pool_new(S11_THREADPOOL_SIZE);
261
262 if (g_tpool == NULL) {
263 log_msg(LOG_ERROR, "Error in creating thread pool. \n");
264 return -1;
265 }
266 log_msg(LOG_INFO, "S11 listener threadpool initialized.\n");
267
268 if (init_gtpv2() != 0)
269 return -1;
270
271 s11_reader();
272
273 return 0;
274}