Line data Source code
1 : /* TDSPool - Connection pooling for TDS based databases
2 : * Copyright (C) 2001 Brian Bruns
3 : *
4 : * This program is free software; you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation; either version 2 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program; if not, write to the Free Software
16 : * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 : *
18 : */
19 :
20 : /*
21 : * Note on terminology: a pool member is a connection to the database,
22 : * a pool user is a client connection that is temporarily assigned to a
23 : * pool member.
24 : */
25 :
26 : #include <config.h>
27 :
28 : #include <stdarg.h>
29 : #include <stdio.h>
30 : #include <signal.h>
31 : #include <errno.h>
32 : #include <fcntl.h>
33 :
34 : #if HAVE_STDLIB_H
35 : #include <stdlib.h>
36 : #endif /* HAVE_STDLIB_H */
37 :
38 : #if HAVE_STRING_H
39 : #include <string.h>
40 : #endif /* HAVE_STRING_H */
41 :
42 : #if HAVE_UNISTD_H
43 : #include <unistd.h>
44 : #endif /* HAVE_UNISTD_H */
45 :
46 : #if HAVE_SYS_SOCKET_H
47 : #include <sys/socket.h>
48 : #endif /* HAVE_SYS_SOCKET_H */
49 :
50 : #if HAVE_NETINET_IN_H
51 : #include <netinet/in.h>
52 : #endif /* HAVE_NETINET_IN_H */
53 :
54 : #if HAVE_ARPA_INET_H
55 : #include <arpa/inet.h>
56 : #endif /* HAVE_ARPA_INET_H */
57 :
58 : #ifdef _WIN32
59 : #include <io.h>
60 : #endif
61 :
62 : #include "pool.h"
63 :
64 : /* to be set by sig term */
65 : static bool got_sigterm = false;
66 : static const char *logfile_name = NULL;
67 :
68 : static void sigterm_handler(int sig);
69 : static void pool_schedule_waiters(TDS_POOL * pool);
70 : static void pool_socket_init(TDS_POOL * pool);
71 : static void pool_main_loop(TDS_POOL * pool);
72 : static bool pool_open_logfile(void);
73 :
74 : static void
75 2 : sigterm_handler(int sig TDS_UNUSED)
76 : {
77 2 : got_sigterm = true;
78 2 : }
79 :
80 : #ifndef _WIN32
81 : static bool got_sighup = false;
82 :
83 : static void
84 0 : sighup_handler(int sig TDS_UNUSED)
85 : {
86 0 : got_sighup = true;
87 0 : }
88 : #endif
89 :
90 : static void
91 6 : check_field(const char *pool_name, bool cond, const char *field_name)
92 : {
93 6 : if (!cond) {
94 0 : fprintf(stderr, "No %s specified for pool ``%s''.\n", field_name, pool_name);
95 0 : exit(EXIT_FAILURE);
96 : }
97 6 : }
98 :
99 : /*
100 : * pool_init creates a named pool and opens connections to the database
101 : */
102 : static TDS_POOL *
103 2 : pool_init(const char *name, const tds_dir_char *config_path)
104 : {
105 : TDS_POOL *pool;
106 2 : char *err = NULL;
107 :
108 : /* initialize the pool */
109 :
110 2 : pool = tds_new0(TDS_POOL, 1);
111 2 : if (!pool) {
112 0 : fprintf(stderr, "Could not allocate memory for pool\n");
113 0 : exit(EXIT_FAILURE);
114 : }
115 2 : pool->password = strdup("");
116 :
117 2 : pool->event_fd = INVALID_SOCKET;
118 4 : if (tds_mutex_init(&pool->events_mtx)) {
119 0 : fprintf(stderr, "Error initializing pool mutex\n");
120 0 : exit(EXIT_FAILURE);
121 : }
122 :
123 : /* FIXME -- read this from the conf file */
124 2 : if (!pool_read_conf_files(config_path, name, pool, &err)) {
125 0 : fprintf(stderr, "Configuration for pool ``%s'' not found.\n", name);
126 0 : exit(EXIT_FAILURE);
127 : }
128 :
129 2 : if (err) {
130 0 : fprintf(stderr, "%s\n", err);
131 0 : exit(EXIT_FAILURE);
132 : }
133 2 : check_field(name, pool->user != NULL, "user");
134 2 : check_field(name, pool->server != NULL, "server");
135 2 : check_field(name, pool->port != 0, "port");
136 :
137 2 : if (!pool->server_user)
138 2 : pool->server_user = strdup(pool->user);
139 2 : if (!pool->server_password)
140 2 : pool->server_password = strdup(pool->password);
141 :
142 2 : if (pool->max_open_conn < pool->min_open_conn) {
143 0 : fprintf(stderr, "Max connections less than minimum\n");
144 0 : exit(EXIT_FAILURE);
145 : }
146 :
147 2 : pool->name = strdup(name);
148 :
149 2 : pool_open_logfile();
150 :
151 2 : pool_mbr_init(pool);
152 2 : pool_user_init(pool);
153 :
154 2 : pool_socket_init(pool);
155 :
156 2 : return pool;
157 : }
158 :
159 : static void
160 2 : pool_destroy(TDS_POOL *pool)
161 : {
162 2 : pool_mbr_destroy(pool);
163 2 : pool_user_destroy(pool);
164 :
165 2 : CLOSESOCKET(pool->wakeup_fd);
166 2 : CLOSESOCKET(pool->listen_fd);
167 2 : CLOSESOCKET(pool->event_fd);
168 2 : tds_mutex_free(&pool->events_mtx);
169 :
170 2 : free(pool->user);
171 2 : free(pool->password);
172 2 : free(pool->server);
173 2 : free(pool->database);
174 2 : free(pool->name);
175 2 : free(pool->server_user);
176 2 : free(pool->server_password);
177 2 : free(pool);
178 2 : }
179 :
180 : static void
181 0 : pool_schedule_waiters(TDS_POOL * pool)
182 : {
183 : TDS_POOL_USER *puser;
184 :
185 : /* first see if there are free members to do the request */
186 0 : if (!dlist_member_first(&pool->idle_members))
187 : return;
188 :
189 0 : while ((puser = dlist_user_first(&pool->waiters)) != NULL) {
190 0 : if (puser->user_state == TDS_SRV_WAIT) {
191 : /* place back in query state */
192 0 : puser->user_state = TDS_SRV_QUERY;
193 0 : dlist_user_remove(&pool->waiters, puser);
194 0 : dlist_user_append(&pool->users, puser);
195 : /* now try again */
196 0 : pool_user_query(pool, puser);
197 0 : return;
198 : }
199 : }
200 : }
201 :
202 : typedef struct select_info
203 : {
204 : struct pollfd *fds;
205 : uint32_t num_fds, alloc_fds;
206 : } SELECT_INFO;
207 :
208 : static void
209 148290 : pool_select_add_socket(SELECT_INFO *sel, TDS_POOL_SOCKET *sock)
210 : {
211 : short events;
212 : struct pollfd *fd;
213 :
214 : /* skip dead connections */
215 148290 : if (IS_TDSDEAD(sock->tds))
216 : return;
217 148267 : if (!sock->poll_recv && !sock->poll_send)
218 : return;
219 :
220 146134 : events = 0;
221 146134 : if (sock->poll_recv)
222 146134 : events |= POLLIN;
223 146134 : if (sock->poll_send)
224 0 : events |= POLLOUT;
225 146134 : if (sel->num_fds >= sel->alloc_fds) {
226 4 : sel->alloc_fds *= 2;
227 4 : if (!TDS_RESIZE(sel->fds, sel->alloc_fds)) {
228 0 : fprintf(stderr, "Out of memory allocating fds\n");
229 0 : exit(EXIT_FAILURE);
230 : }
231 : }
232 146134 : sock->poll_index = sel->num_fds;
233 146134 : fd = &sel->fds[sel->num_fds++];
234 146134 : fd->fd = tds_get_s(sock->tds);
235 146134 : fd->events = events;
236 146134 : fd->revents = 0;
237 : }
238 :
239 : static void
240 1438 : pool_process_events(TDS_POOL *pool)
241 : {
242 : TDS_POOL_EVENT *events, *next;
243 :
244 : /* detach events from pool */
245 1438 : tds_mutex_lock(&pool->events_mtx);
246 1438 : events = pool->events;
247 1438 : pool->events = NULL;
248 1438 : tds_mutex_unlock(&pool->events_mtx);
249 :
250 : /* process them */
251 4314 : while (events) {
252 1438 : next = events->next;
253 1438 : events->next = NULL;
254 :
255 1438 : events->execute(events);
256 1438 : free(events);
257 1438 : events = next;
258 : }
259 1438 : }
260 :
261 : static bool
262 2 : pool_open_logfile(void)
263 : {
264 : int fd;
265 :
266 2 : tds_append_mode = 0;
267 2 : tdsdump_open(tds_dir_getenv(TDS_DIR("TDSDUMP")));
268 :
269 2 : if (!logfile_name)
270 : return true;
271 0 : fd = open(logfile_name, O_WRONLY|O_CREAT|O_APPEND, 0644);
272 0 : if (fd < 0)
273 : return false;
274 :
275 0 : fflush(stdout);
276 0 : fflush(stderr);
277 0 : while (dup2(fd, fileno(stdout)) < 0 && errno == EINTR)
278 0 : continue;
279 0 : while (dup2(fd, fileno(stderr)) < 0 && errno == EINTR)
280 0 : continue;
281 0 : close(fd);
282 0 : fflush(stdout);
283 0 : fflush(stderr);
284 :
285 0 : return true;
286 : }
287 :
288 : static void
289 2 : pool_socket_init(TDS_POOL * pool)
290 : {
291 : struct sockaddr_in sin;
292 : TDS_SYS_SOCKET s, event_pair[2];
293 2 : int socktrue = 1;
294 :
295 : /* FIXME -- read the interfaces file and bind accordingly */
296 2 : sin.sin_addr.s_addr = INADDR_ANY;
297 2 : sin.sin_port = htons(pool->port);
298 2 : sin.sin_family = AF_INET;
299 :
300 2 : if (TDS_IS_SOCKET_INVALID(s = socket(AF_INET, SOCK_STREAM, 0))) {
301 0 : perror("socket");
302 0 : exit(1);
303 : }
304 2 : tds_socket_set_nonblocking(s);
305 : /* don't keep addr in use from s.craig@andronics.com */
306 2 : setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const void *) &socktrue, sizeof(socktrue));
307 :
308 2 : fprintf(stderr, "Listening on port %d\n", pool->port);
309 2 : if (bind(s, (struct sockaddr *) &sin, sizeof(sin)) < 0) {
310 0 : perror("bind");
311 0 : exit(1);
312 : }
313 2 : listen(s, 5);
314 2 : pool->listen_fd = s;
315 :
316 2 : if (socketpair(AF_UNIX, SOCK_STREAM, 0, event_pair) < 0) {
317 0 : perror("socketpair");
318 0 : exit(1);
319 : }
320 2 : tds_socket_set_nonblocking(event_pair[0]);
321 2 : tds_socket_set_nonblocking(event_pair[1]);
322 2 : pool->event_fd = event_pair[1];
323 2 : pool->wakeup_fd = event_pair[0];
324 2 : }
325 :
326 : /*
327 : * pool_main_loop
328 : * Accept new connections from clients, and handle all input from clients and
329 : * pool members.
330 : */
331 : static void
332 2 : pool_main_loop(TDS_POOL * pool)
333 : {
334 : TDS_POOL_MEMBER *pmbr;
335 : TDS_POOL_USER *puser;
336 : TDS_SYS_SOCKET s, wakeup;
337 2 : SELECT_INFO sel = { NULL, 0, 8 };
338 2 : int min_expire_left = -1;
339 : int rc;
340 :
341 2 : s = pool->listen_fd;
342 2 : wakeup = pool->wakeup_fd;
343 :
344 : /* add the listening socket to the read list */
345 2 : if (!TDS_RESIZE(sel.fds, sel.alloc_fds)) {
346 0 : fprintf(stderr, "Out of memory allocating fds\n");
347 0 : exit(EXIT_FAILURE);
348 : }
349 2 : sel.fds[0].fd = s;
350 2 : sel.fds[0].events = POLLIN;
351 2 : sel.fds[1].fd = wakeup;
352 2 : sel.fds[1].events = POLLIN;
353 :
354 36427 : while (!got_sigterm) {
355 :
356 36423 : sel.num_fds = 2;
357 36423 : sel.fds[0].revents = 0;
358 36423 : sel.fds[1].revents = 0;
359 :
360 : /* add the user sockets to the read list */
361 149690 : DLIST_FOREACH(dlist_user, &pool->users, puser)
362 74505 : pool_select_add_socket(&sel, &puser->sock);
363 :
364 : /* add the pool member sockets to the read list */
365 148928 : DLIST_FOREACH(dlist_member, &pool->active_members, pmbr)
366 73785 : pool_select_add_socket(&sel, &pmbr->sock);
367 :
368 36423 : if (min_expire_left > 0)
369 22743 : min_expire_left *= 1000;
370 :
371 36423 : rc = poll(sel.fds, sel.num_fds, min_expire_left);
372 36423 : if (TDS_UNLIKELY(rc < 0)) {
373 : char *errstr;
374 :
375 2 : if (sock_errno == TDSSOCK_EINTR)
376 2 : continue;
377 0 : errstr = sock_strerror(sock_errno);
378 0 : fprintf(stderr, "Error: poll returned %d, %s\n", sock_errno, errstr);
379 : sock_strerror_free(errstr);
380 0 : exit(EXIT_FAILURE);
381 : }
382 36421 : if (TDS_UNLIKELY(got_sigterm))
383 : break;
384 :
385 : #ifndef _WIN32
386 36421 : if (TDS_UNLIKELY(got_sighup)) {
387 0 : got_sighup = false;
388 0 : pool_open_logfile();
389 : }
390 : #endif
391 :
392 : /* process events */
393 36421 : if ((sel.fds[1].revents & POLLIN) != 0) {
394 : char buf[32];
395 1438 : READSOCKET(wakeup, buf, sizeof(buf));
396 :
397 1438 : pool_process_events(pool);
398 : }
399 :
400 : /* process the sockets */
401 36421 : if ((sel.fds[0].revents & POLLIN) != 0) {
402 720 : pool_user_create(pool, s);
403 : }
404 36421 : pool_process_users(pool, sel.fds, sel.num_fds);
405 36421 : min_expire_left = pool_process_members(pool, sel.fds, sel.num_fds);
406 :
407 : /* back from members */
408 36421 : if (dlist_user_first(&pool->waiters))
409 0 : pool_schedule_waiters(pool);
410 : } /* while !got_sigterm */
411 2 : tdsdump_log(TDS_DBG_INFO2, "Shutdown Requested\n");
412 2 : }
413 :
414 : static void
415 : print_usage(const char *progname)
416 : {
417 0 : fprintf(stderr, "Usage:\t%s [-l <log file>] [-c <conf file>] [-d] <pool name>\n", progname);
418 : }
419 :
420 : int
421 2 : main(int argc, char **argv)
422 : {
423 : int opt;
424 : #ifdef HAVE_FORK
425 2 : bool daemonize = false;
426 : # define DAEMON_OPT "d"
427 : #else
428 : # define DAEMON_OPT ""
429 : #endif
430 : TDS_POOL *pool;
431 2 : tds_dir_char *config_path = NULL;
432 :
433 2 : signal(SIGTERM, sigterm_handler);
434 2 : signal(SIGINT, sigterm_handler);
435 : #ifndef _WIN32
436 2 : signal(SIGHUP, sighup_handler);
437 2 : signal(SIGPIPE, SIG_IGN);
438 : #endif
439 :
440 4 : while ((opt = getopt(argc, argv, "l:c:" DAEMON_OPT)) != -1) {
441 0 : switch (opt) {
442 0 : case 'l':
443 0 : logfile_name = optarg;
444 0 : break;
445 : #ifdef HAVE_FORK
446 : case 'd':
447 : daemonize = true;
448 : break;
449 : #endif
450 0 : case 'c':
451 0 : config_path = tds_dir_from_cstr(optarg);
452 0 : if (!config_path) {
453 0 : fprintf(stderr, "Out of memory\n");
454 0 : return EXIT_FAILURE;
455 : }
456 : break;
457 0 : default:
458 0 : print_usage(argv[0]);
459 0 : return EXIT_FAILURE;
460 : }
461 : }
462 2 : if (optind >= argc) {
463 0 : print_usage(argv[0]);
464 0 : return EXIT_FAILURE;
465 : }
466 2 : pool = pool_init(argv[optind], config_path);
467 2 : TDS_ZERO_FREE(config_path);
468 : #ifdef HAVE_FORK
469 2 : if (daemonize) {
470 0 : if (daemon(0, 0) < 0) {
471 0 : fprintf(stderr, "Failed to daemonize %s\n", argv[0]);
472 0 : return EXIT_FAILURE;
473 : }
474 : }
475 : #endif
476 2 : pool_main_loop(pool);
477 2 : printf("User logins %lu members logins %lu members at end %d\n", pool->user_logins, pool->member_logins, pool->num_active_members);
478 2 : pool_destroy(pool);
479 2 : printf("tdspool Shutdown\n");
480 2 : return EXIT_SUCCESS;
481 : }
|