Line data Source code
1 : /* TDSPool - Connection pooling for TDS based databases
2 : * Copyright (C) 2001 Brian Bruns
3 : *
4 : * This program is free software; you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation; either version 2 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program; if not, write to the Free Software
16 : * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 : *
18 : */
19 :
20 : /*
21 : * Note on terminology: a pool member is a connection to the database,
22 : * a pool user is a client connection that is temporarily assigned to a
23 : * pool member.
24 : */
25 :
26 : #include <config.h>
27 :
28 : #include <stdarg.h>
29 : #include <stdio.h>
30 : #include <signal.h>
31 : #include <errno.h>
32 : #include <fcntl.h>
33 :
34 : #if HAVE_STDLIB_H
35 : #include <stdlib.h>
36 : #endif /* HAVE_STDLIB_H */
37 :
38 : #if HAVE_STRING_H
39 : #include <string.h>
40 : #endif /* HAVE_STRING_H */
41 :
42 : #if HAVE_UNISTD_H
43 : #include <unistd.h>
44 : #endif /* HAVE_UNISTD_H */
45 :
46 : #if HAVE_SYS_SOCKET_H
47 : #include <sys/socket.h>
48 : #endif /* HAVE_SYS_SOCKET_H */
49 :
50 : #if HAVE_POLL_H
51 : #include <poll.h>
52 : #endif /* HAVE_POLL_H */
53 :
54 : #if HAVE_NETINET_IN_H
55 : #include <netinet/in.h>
56 : #endif /* HAVE_NETINET_IN_H */
57 :
58 : #if HAVE_ARPA_INET_H
59 : #include <arpa/inet.h>
60 : #endif /* HAVE_ARPA_INET_H */
61 :
62 : #ifdef _WIN32
63 : #include <io.h>
64 : #endif
65 :
66 : #include "pool.h"
67 :
68 : /* to be set by sig term */
69 : static bool got_sigterm = false;
70 : static const char *logfile_name = NULL;
71 :
72 : static void sigterm_handler(int sig);
73 : static void pool_schedule_waiters(TDS_POOL * pool);
74 : static TDS_POOL *pool_init(const char *name, const char *config_path);
75 : static void pool_socket_init(TDS_POOL * pool);
76 : static void pool_main_loop(TDS_POOL * pool);
77 : static bool pool_open_logfile(TDS_POOL * pool);
78 :
79 : static void
80 2 : sigterm_handler(int sig)
81 : {
82 2 : got_sigterm = true;
83 2 : }
84 :
85 : #ifndef _WIN32
86 : static bool got_sighup = false;
87 :
88 : static void
89 0 : sighup_handler(int sig)
90 : {
91 0 : got_sighup = true;
92 0 : }
93 : #endif
94 :
95 : static void
96 6 : check_field(const char *pool_name, bool cond, const char *field_name)
97 : {
98 6 : if (!cond) {
99 0 : fprintf(stderr, "No %s specified for pool ``%s''.\n", field_name, pool_name);
100 0 : exit(EXIT_FAILURE);
101 : }
102 6 : }
103 :
104 : /*
105 : * pool_init creates a named pool and opens connections to the database
106 : */
107 : static TDS_POOL *
108 2 : pool_init(const char *name, const char *config_path)
109 : {
110 : TDS_POOL *pool;
111 2 : char *err = NULL;
112 :
113 : /* initialize the pool */
114 :
115 2 : pool = tds_new0(TDS_POOL, 1);
116 2 : if (!pool) {
117 0 : fprintf(stderr, "Could not allocate memory for pool\n");
118 0 : exit(EXIT_FAILURE);
119 : }
120 2 : pool->password = strdup("");
121 :
122 2 : pool->event_fd = INVALID_SOCKET;
123 4 : if (tds_mutex_init(&pool->events_mtx)) {
124 0 : fprintf(stderr, "Error initializing pool mutex\n");
125 0 : exit(EXIT_FAILURE);
126 : }
127 :
128 : /* FIXME -- read this from the conf file */
129 2 : if (!pool_read_conf_files(config_path, name, pool, &err)) {
130 0 : fprintf(stderr, "Configuration for pool ``%s'' not found.\n", name);
131 0 : exit(EXIT_FAILURE);
132 : }
133 :
134 2 : if (err) {
135 0 : fprintf(stderr, "%s\n", err);
136 0 : exit(EXIT_FAILURE);
137 : }
138 2 : check_field(name, pool->user != NULL, "user");
139 2 : check_field(name, pool->server != NULL, "server");
140 2 : check_field(name, pool->port != 0, "port");
141 :
142 2 : if (!pool->server_user)
143 2 : pool->server_user = strdup(pool->user);
144 2 : if (!pool->server_password)
145 2 : pool->server_password = strdup(pool->password);
146 :
147 2 : if (pool->max_open_conn < pool->min_open_conn) {
148 0 : fprintf(stderr, "Max connections less than minimum\n");
149 0 : exit(EXIT_FAILURE);
150 : }
151 :
152 2 : pool->name = strdup(name);
153 :
154 2 : pool_open_logfile(pool);
155 :
156 2 : pool_mbr_init(pool);
157 2 : pool_user_init(pool);
158 :
159 2 : pool_socket_init(pool);
160 :
161 2 : return pool;
162 : }
163 :
164 : static void
165 2 : pool_destroy(TDS_POOL *pool)
166 : {
167 2 : pool_mbr_destroy(pool);
168 2 : pool_user_destroy(pool);
169 :
170 2 : CLOSESOCKET(pool->wakeup_fd);
171 2 : CLOSESOCKET(pool->listen_fd);
172 2 : CLOSESOCKET(pool->event_fd);
173 2 : tds_mutex_free(&pool->events_mtx);
174 :
175 2 : free(pool->user);
176 2 : free(pool->password);
177 2 : free(pool->server);
178 2 : free(pool->database);
179 2 : free(pool->name);
180 2 : free(pool->server_user);
181 2 : free(pool->server_password);
182 2 : free(pool);
183 2 : }
184 :
185 : static void
186 0 : pool_schedule_waiters(TDS_POOL * pool)
187 : {
188 : TDS_POOL_USER *puser;
189 :
190 : /* first see if there are free members to do the request */
191 0 : if (!dlist_member_first(&pool->idle_members))
192 : return;
193 :
194 0 : while ((puser = dlist_user_first(&pool->waiters)) != NULL) {
195 0 : if (puser->user_state == TDS_SRV_WAIT) {
196 : /* place back in query state */
197 0 : puser->user_state = TDS_SRV_QUERY;
198 0 : dlist_user_remove(&pool->waiters, puser);
199 0 : dlist_user_append(&pool->users, puser);
200 : /* now try again */
201 0 : pool_user_query(pool, puser);
202 0 : return;
203 : }
204 : }
205 : }
206 :
207 : typedef struct select_info
208 : {
209 : struct pollfd *fds;
210 : uint32_t num_fds, alloc_fds;
211 : } SELECT_INFO;
212 :
213 : static void
214 148056 : pool_select_add_socket(SELECT_INFO *sel, TDS_POOL_SOCKET *sock)
215 : {
216 : short events;
217 : struct pollfd *fd;
218 :
219 : /* skip dead connections */
220 148056 : if (IS_TDSDEAD(sock->tds))
221 : return;
222 148034 : if (!sock->poll_recv && !sock->poll_send)
223 : return;
224 :
225 145917 : events = 0;
226 145917 : if (sock->poll_recv)
227 145917 : events |= POLLIN;
228 145917 : if (sock->poll_send)
229 1 : events |= POLLOUT;
230 145917 : if (sel->num_fds >= sel->alloc_fds) {
231 4 : sel->alloc_fds *= 2;
232 4 : if (!TDS_RESIZE(sel->fds, sel->alloc_fds)) {
233 0 : fprintf(stderr, "Out of memory allocating fds\n");
234 0 : exit(EXIT_FAILURE);
235 : }
236 : }
237 145917 : sock->poll_index = sel->num_fds;
238 145917 : fd = &sel->fds[sel->num_fds++];
239 145917 : fd->fd = tds_get_s(sock->tds);
240 145917 : fd->events = events;
241 145917 : fd->revents = 0;
242 : }
243 :
244 : static void
245 1426 : pool_process_events(TDS_POOL *pool)
246 : {
247 : TDS_POOL_EVENT *events, *next;
248 :
249 : /* detach events from pool */
250 1426 : tds_mutex_lock(&pool->events_mtx);
251 1426 : events = pool->events;
252 1426 : pool->events = NULL;
253 1426 : tds_mutex_unlock(&pool->events_mtx);
254 :
255 : /* process them */
256 4278 : while (events) {
257 1426 : next = events->next;
258 1426 : events->next = NULL;
259 :
260 1426 : events->execute(events);
261 1426 : free(events);
262 1426 : events = next;
263 : }
264 1426 : }
265 :
266 : static bool
267 2 : pool_open_logfile(TDS_POOL *pool)
268 : {
269 : int fd;
270 :
271 2 : tds_g_append_mode = 0;
272 2 : tdsdump_open(getenv("TDSDUMP"));
273 :
274 2 : if (!logfile_name)
275 : return true;
276 0 : fd = open(logfile_name, O_WRONLY|O_CREAT|O_APPEND, 0644);
277 0 : if (fd < 0)
278 : return false;
279 :
280 0 : fflush(stdout);
281 0 : fflush(stderr);
282 0 : while (dup2(fd, fileno(stdout)) < 0 && errno == EINTR)
283 0 : continue;
284 0 : while (dup2(fd, fileno(stderr)) < 0 && errno == EINTR)
285 0 : continue;
286 0 : close(fd);
287 0 : fflush(stdout);
288 0 : fflush(stderr);
289 :
290 : return true;
291 : }
292 :
293 : static void
294 2 : pool_socket_init(TDS_POOL * pool)
295 : {
296 : struct sockaddr_in sin;
297 : TDS_SYS_SOCKET s, event_pair[2];
298 2 : int socktrue = 1;
299 :
300 : /* FIXME -- read the interfaces file and bind accordingly */
301 2 : sin.sin_addr.s_addr = INADDR_ANY;
302 2 : sin.sin_port = htons(pool->port);
303 2 : sin.sin_family = AF_INET;
304 :
305 2 : if (TDS_IS_SOCKET_INVALID(s = socket(AF_INET, SOCK_STREAM, 0))) {
306 0 : perror("socket");
307 0 : exit(1);
308 : }
309 2 : tds_socket_set_nonblocking(s);
310 : /* don't keep addr in use from s.craig@andronics.com */
311 2 : setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const void *) &socktrue, sizeof(socktrue));
312 :
313 2 : fprintf(stderr, "Listening on port %d\n", pool->port);
314 2 : if (bind(s, (struct sockaddr *) &sin, sizeof(sin)) < 0) {
315 0 : perror("bind");
316 0 : exit(1);
317 : }
318 2 : listen(s, 5);
319 2 : pool->listen_fd = s;
320 :
321 2 : if (socketpair(AF_UNIX, SOCK_STREAM, 0, event_pair) < 0) {
322 0 : perror("socketpair");
323 0 : exit(1);
324 : }
325 2 : tds_socket_set_nonblocking(event_pair[0]);
326 2 : tds_socket_set_nonblocking(event_pair[1]);
327 2 : pool->event_fd = event_pair[1];
328 2 : pool->wakeup_fd = event_pair[0];
329 2 : }
330 :
331 : /*
332 : * pool_main_loop
333 : * Accept new connections from clients, and handle all input from clients and
334 : * pool members.
335 : */
336 : static void
337 2 : pool_main_loop(TDS_POOL * pool)
338 : {
339 : TDS_POOL_MEMBER *pmbr;
340 : TDS_POOL_USER *puser;
341 : TDS_SYS_SOCKET s, wakeup;
342 2 : SELECT_INFO sel = { NULL, 0, 8 };
343 2 : int min_expire_left = -1;
344 : int rc;
345 :
346 2 : s = pool->listen_fd;
347 2 : wakeup = pool->wakeup_fd;
348 :
349 : /* add the listening socket to the read list */
350 2 : if (!TDS_RESIZE(sel.fds, sel.alloc_fds)) {
351 0 : fprintf(stderr, "Out of memory allocating fds\n");
352 0 : exit(EXIT_FAILURE);
353 : }
354 2 : sel.fds[0].fd = s;
355 2 : sel.fds[0].events = POLLIN;
356 2 : sel.fds[1].fd = wakeup;
357 2 : sel.fds[1].events = POLLIN;
358 :
359 35994 : while (!got_sigterm) {
360 :
361 35990 : sel.num_fds = 2;
362 35990 : sel.fds[0].revents = 0;
363 35990 : sel.fds[1].revents = 0;
364 :
365 : /* add the user sockets to the read list */
366 149444 : DLIST_FOREACH(dlist_user, &pool->users, puser)
367 74385 : pool_select_add_socket(&sel, &puser->sock);
368 :
369 : /* add the pool member sockets to the read list */
370 148688 : DLIST_FOREACH(dlist_member, &pool->active_members, pmbr)
371 73671 : pool_select_add_socket(&sel, &pmbr->sock);
372 :
373 35990 : if (min_expire_left > 0)
374 22369 : min_expire_left *= 1000;
375 :
376 35990 : rc = poll(sel.fds, sel.num_fds, min_expire_left);
377 35990 : if (TDS_UNLIKELY(rc < 0)) {
378 : char *errstr;
379 :
380 2 : if (sock_errno == TDSSOCK_EINTR)
381 2 : continue;
382 0 : errstr = sock_strerror(sock_errno);
383 0 : fprintf(stderr, "Error: poll returned %d, %s\n", sock_errno, errstr);
384 : sock_strerror_free(errstr);
385 0 : exit(EXIT_FAILURE);
386 : }
387 35988 : if (TDS_UNLIKELY(got_sigterm))
388 : break;
389 :
390 : #ifndef _WIN32
391 35988 : if (TDS_UNLIKELY(got_sighup)) {
392 0 : got_sighup = false;
393 0 : pool_open_logfile(pool);
394 : }
395 : #endif
396 :
397 : /* process events */
398 35988 : if ((sel.fds[1].revents & POLLIN) != 0) {
399 : char buf[32];
400 1426 : READSOCKET(wakeup, buf, sizeof(buf));
401 :
402 1426 : pool_process_events(pool);
403 : }
404 :
405 : /* process the sockets */
406 35988 : if ((sel.fds[0].revents & POLLIN) != 0) {
407 714 : pool_user_create(pool, s);
408 : }
409 35988 : pool_process_users(pool, sel.fds, sel.num_fds);
410 35988 : min_expire_left = pool_process_members(pool, sel.fds, sel.num_fds);
411 :
412 : /* back from members */
413 35988 : if (dlist_user_first(&pool->waiters))
414 0 : pool_schedule_waiters(pool);
415 : } /* while !got_sigterm */
416 2 : tdsdump_log(TDS_DBG_INFO2, "Shutdown Requested\n");
417 2 : }
418 :
419 : static void
420 : print_usage(const char *progname)
421 : {
422 0 : fprintf(stderr, "Usage:\t%s [-l <log file>] [-c <conf file>] [-d] <pool name>\n", progname);
423 : }
424 :
425 : int
426 2 : main(int argc, char **argv)
427 : {
428 : int opt;
429 : #ifdef HAVE_FORK
430 2 : bool daemonize = false;
431 : # define DAEMON_OPT "d"
432 : #else
433 : # define DAEMON_OPT ""
434 : #endif
435 : TDS_POOL *pool;
436 2 : const char *config_path = NULL;
437 :
438 2 : signal(SIGTERM, sigterm_handler);
439 2 : signal(SIGINT, sigterm_handler);
440 : #ifndef _WIN32
441 2 : signal(SIGHUP, sighup_handler);
442 2 : signal(SIGPIPE, SIG_IGN);
443 : #endif
444 :
445 4 : while ((opt = getopt(argc, argv, "l:c:" DAEMON_OPT)) != -1) {
446 0 : switch (opt) {
447 0 : case 'l':
448 0 : logfile_name = optarg;
449 0 : break;
450 : #ifdef HAVE_FORK
451 : case 'd':
452 : daemonize = true;
453 : break;
454 : #endif
455 0 : case 'c':
456 0 : config_path = optarg;
457 0 : break;
458 0 : default:
459 0 : print_usage(argv[0]);
460 0 : return EXIT_FAILURE;
461 : }
462 : }
463 2 : if (optind >= argc) {
464 0 : print_usage(argv[0]);
465 0 : return EXIT_FAILURE;
466 : }
467 2 : pool = pool_init(argv[optind], config_path);
468 : #ifdef HAVE_FORK
469 2 : if (daemonize) {
470 0 : if (daemon(0, 0) < 0) {
471 0 : fprintf(stderr, "Failed to daemonize %s\n", argv[0]);
472 0 : return EXIT_FAILURE;
473 : }
474 : }
475 : #endif
476 2 : pool_main_loop(pool);
477 2 : printf("User logins %lu members logins %lu members at end %d\n", pool->user_logins, pool->member_logins, pool->num_active_members);
478 2 : pool_destroy(pool);
479 2 : printf("tdspool Shutdown\n");
480 2 : return EXIT_SUCCESS;
481 : }
|