Line data Source code
1 : /* TDSPool - Connection pooling for TDS based databases
2 : * Copyright (C) 2001, 2002, 2003, 2004, 2005 Brian Bruns
3 : *
4 : * This program is free software; you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation; either version 2 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program; if not, write to the Free Software
16 : * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 : *
18 : */
19 :
20 : #include <config.h>
21 :
22 : #include <stdarg.h>
23 : #include <stdio.h>
24 : #include <assert.h>
25 :
26 : #if HAVE_STDLIB_H
27 : #include <stdlib.h>
28 : #endif /* HAVE_STDLIB_H */
29 :
30 : #if HAVE_STRING_H
31 : #include <string.h>
32 : #endif /* HAVE_STRING_H */
33 :
34 : #if HAVE_UNISTD_H
35 : #include <unistd.h>
36 : #endif /* HAVE_UNISTD_H */
37 :
38 : #if HAVE_SYS_PARAM_H
39 : #include <sys/param.h>
40 : #endif /* HAVE_SYS_PARAM_H */
41 :
42 : #if HAVE_SYS_SOCKET_H
43 : #include <sys/socket.h>
44 : #endif /* HAVE_SYS_SOCKET_H */
45 :
46 : #if HAVE_NETINET_IN_H
47 : #include <netinet/in.h>
48 : #endif /* HAVE_NETINET_IN_H */
49 :
50 : #if HAVE_ARPA_INET_H
51 : #include <arpa/inet.h>
52 : #endif /* HAVE_ARPA_INET_H */
53 :
54 : #include "pool.h"
55 : #include <freetds/utils/string.h>
56 :
57 : #ifndef MAXHOSTNAMELEN
58 : #define MAXHOSTNAMELEN 256
59 : #endif /* MAXHOSTNAMELEN */
60 :
61 : static void
62 25 : pool_mbr_free_socket(TDSSOCKET *tds)
63 : {
64 25 : if (tds) {
65 25 : TDSCONTEXT *ctx = (TDSCONTEXT *) tds->conn->tds_ctx;
66 :
67 25 : tds_free_socket(tds);
68 25 : tds_free_context(ctx);
69 : }
70 25 : }
71 :
72 : /*
73 : * pool_mbr_login open a single pool login, to be call at init time or
74 : * to reconnect.
75 : */
76 : static TDSSOCKET *
77 25 : pool_mbr_login(const TDS_POOL * pool, int tds_version)
78 : {
79 : TDSCONTEXT *context;
80 : TDSLOGIN *login;
81 : TDSSOCKET *tds;
82 : TDSLOGIN *connection;
83 : char hostname[MAXHOSTNAMELEN];
84 :
85 25 : login = tds_alloc_login(true);
86 25 : if (!login) {
87 0 : fprintf(stderr, "out of memory");
88 0 : return NULL;
89 : }
90 25 : if (gethostname(hostname, MAXHOSTNAMELEN) < 0)
91 0 : strlcpy(hostname, "tdspool", MAXHOSTNAMELEN);
92 25 : if (!tds_set_passwd(login, pool->server_password)
93 25 : || !tds_set_user(login, pool->server_user)
94 25 : || !tds_set_app(login, "tdspool")
95 25 : || !tds_set_host(login, hostname)
96 25 : || !tds_set_library(login, "TDS-Library")
97 25 : || !tds_set_server(login, pool->server)
98 25 : || !tds_set_client_charset(login, "iso_1")
99 25 : || !tds_set_language(login, "us_english")) {
100 0 : tds_free_login(login);
101 0 : return NULL;
102 : }
103 25 : if (tds_version > 0)
104 23 : login->tds_version = tds_version;
105 25 : if (pool->database && strlen(pool->database)) {
106 25 : if (!tds_dstr_copy(&login->database, pool->database)) {
107 0 : tds_free_login(login);
108 0 : return NULL;
109 : }
110 : }
111 25 : context = tds_alloc_context(NULL);
112 25 : if (!context) {
113 0 : fprintf(stderr, "Context cannot be null\n");
114 0 : return NULL;
115 : }
116 25 : tds = tds_alloc_socket(context, 512);
117 25 : if (!tds) {
118 0 : fprintf(stderr, "tds cannot be null\n");
119 0 : return NULL;
120 : }
121 25 : connection = tds_read_config_info(tds, login, context->locale);
122 25 : tds_free_login(login);
123 25 : if (!connection || TDS_FAILED(tds_connect_and_login(tds, connection))) {
124 0 : pool_mbr_free_socket(tds);
125 0 : tds_free_login(connection);
126 : /* what to do? */
127 0 : fprintf(stderr, "Could not open connection to server %s\n", pool->server);
128 0 : return NULL;
129 : }
130 25 : tds_free_login(connection);
131 :
132 25 : if (pool->database && strlen(pool->database)) {
133 25 : if (strcasecmp(tds->conn->env.database, pool->database) != 0) {
134 0 : fprintf(stderr, "changing database failed\n");
135 0 : return NULL;
136 : }
137 : }
138 :
139 : return tds;
140 : }
141 :
142 : void
143 718 : pool_assign_member(TDS_POOL *pool, TDS_POOL_MEMBER * pmbr, TDS_POOL_USER *puser)
144 : {
145 718 : pool_mbr_check(pool);
146 718 : assert(pmbr->current_user == NULL);
147 : if (pmbr->current_user) {
148 : pmbr->current_user->assigned_member = NULL;
149 : } else {
150 718 : dlist_member_remove(&pool->idle_members, pmbr);
151 718 : dlist_member_append(&pool->active_members, pmbr);
152 : }
153 718 : pmbr->current_user = puser;
154 718 : puser->assigned_member = pmbr;
155 718 : pool_mbr_check(pool);
156 718 : }
157 :
158 : void
159 718 : pool_deassign_member(TDS_POOL *pool, TDS_POOL_MEMBER * pmbr)
160 : {
161 718 : if (pmbr->current_user) {
162 718 : pmbr->current_user->assigned_member = NULL;
163 718 : pmbr->current_user = NULL;
164 718 : dlist_member_remove(&pool->active_members, pmbr);
165 718 : dlist_member_append(&pool->idle_members, pmbr);
166 : }
167 718 : pmbr->sock.poll_send = false;
168 718 : }
169 :
170 : /*
171 : * if a dead connection on the client side left this member in a questionable
172 : * state, let's bring in a correct one
173 : * We are not sure what the client did so we must try to clean as much as
174 : * possible.
175 : * Use pool_free_member if the state is really broken.
176 : */
177 : void
178 716 : pool_reset_member(TDS_POOL * pool, TDS_POOL_MEMBER * pmbr)
179 : {
180 : // FIXME not wait for server !!! asyncronous
181 716 : TDSSOCKET *tds = pmbr->sock.tds;
182 : TDS_POOL_USER *puser;
183 :
184 716 : puser = pmbr->current_user;
185 716 : if (puser) {
186 0 : pool_deassign_member(pool, pmbr);
187 0 : pool_free_user(pool, puser);
188 : }
189 :
190 : /* cancel whatever pending */
191 716 : tds_init_write_buf(tds);
192 716 : if (tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
193 : goto failure;
194 716 : tds->out_flag = TDS_CANCEL;
195 716 : if (TDS_FAILED(tds_flush_packet(tds)))
196 : goto failure;
197 716 : tds_set_state(tds, TDS_PENDING);
198 716 : tds->in_cancel = 2;
199 :
200 716 : if (TDS_FAILED(tds_process_cancel(tds)))
201 : goto failure;
202 :
203 716 : if (IS_TDS71_PLUS(tds->conn)) {
204 : /* this 0x9 final reset the state from mssql 2000 */
205 716 : if (tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
206 : goto failure;
207 716 : tds_start_query(tds, TDS_QUERY);
208 716 : tds_put_string(tds, "WHILE @@TRANCOUNT > 0 ROLLBACK SET TRANSACTION ISOLATION LEVEL READ COMMITTED", -1);
209 716 : tds_write_packet(tds, 0x9);
210 716 : tds_set_state(tds, TDS_PENDING);
211 :
212 716 : if (TDS_FAILED(tds_process_simple_query(tds)))
213 : goto failure;
214 : }
215 : return;
216 :
217 1 : failure:
218 1 : pool_free_member(pool, pmbr);
219 : }
220 :
221 : void
222 25 : pool_free_member(TDS_POOL * pool, TDS_POOL_MEMBER * pmbr)
223 : {
224 : TDSSOCKET *tds;
225 : TDS_POOL_USER *puser;
226 :
227 25 : tds = pmbr->sock.tds;
228 25 : if (tds) {
229 25 : if (!IS_TDSDEAD(tds))
230 25 : tds_close_socket(tds);
231 25 : pool_mbr_free_socket(tds);
232 25 : pmbr->sock.tds = NULL;
233 : }
234 :
235 : /*
236 : * if he is allocated disconnect the client
237 : * otherwise we end up with broken client.
238 : */
239 25 : puser = pmbr->current_user;
240 25 : if (puser) {
241 2 : pool_deassign_member(pool, pmbr);
242 2 : pool_free_user(pool, puser);
243 : }
244 :
245 50 : if (dlist_member_in_list(&pool->active_members, pmbr)) {
246 25 : pool->num_active_members--;
247 25 : dlist_member_remove(&pool->active_members, pmbr);
248 : }
249 25 : free(pmbr);
250 25 : pool_mbr_check(pool);
251 25 : }
252 :
253 : void
254 2 : pool_mbr_init(TDS_POOL * pool)
255 : {
256 : TDS_POOL_MEMBER *pmbr;
257 :
258 : /* allocate room for pool members */
259 :
260 2 : pool->num_active_members = 0;
261 4 : dlist_member_init(&pool->active_members);
262 4 : dlist_member_init(&pool->idle_members);
263 2 : pool_mbr_check(pool);
264 :
265 : /* open connections for each member */
266 6 : while (pool->num_active_members < pool->min_open_conn) {
267 2 : pmbr = tds_new0(TDS_POOL_MEMBER, 1);
268 2 : if (!pmbr) {
269 0 : fprintf(stderr, "Out of memory\n");
270 0 : exit(1);
271 : }
272 2 : pmbr->sock.poll_recv = true;
273 :
274 2 : pmbr->sock.tds = pool_mbr_login(pool, 0);
275 2 : if (!pmbr->sock.tds) {
276 0 : fprintf(stderr, "Could not open initial connection\n");
277 0 : exit(1);
278 : }
279 2 : pmbr->last_used_tm = time(NULL);
280 2 : pool->num_active_members++;
281 2 : dlist_member_append(&pool->idle_members, pmbr);
282 2 : if (!IS_TDS71_PLUS(pmbr->sock.tds->conn)) {
283 0 : fprintf(stderr, "Current pool implementation does not support protocol versions former than 7.1\n");
284 0 : exit(1);
285 : }
286 2 : pool->member_logins++;
287 : }
288 2 : pool_mbr_check(pool);
289 2 : }
290 :
291 : void
292 2 : pool_mbr_destroy(TDS_POOL * pool)
293 : {
294 4 : while (dlist_member_first(&pool->active_members))
295 0 : pool_free_member(pool, dlist_member_first(&pool->active_members));
296 46 : while (dlist_member_first(&pool->idle_members))
297 22 : pool_free_member(pool, dlist_member_first(&pool->idle_members));
298 :
299 2 : assert(pool->num_active_members == 0);
300 2 : pool->num_active_members = 0;
301 2 : }
302 :
303 : static bool
304 18416 : pool_process_data(TDS_POOL *pool, TDS_POOL_MEMBER *pmbr)
305 : {
306 18416 : TDSSOCKET *tds = pmbr->sock.tds;
307 18416 : TDS_POOL_USER *puser = NULL;
308 :
309 : for (;;) {
310 35372 : if (pool_packet_read(tds))
311 : break;
312 :
313 : /* disconnected */
314 16958 : if (tds->in_len == 0) {
315 2 : tdsdump_log(TDS_DBG_INFO1, "Uh oh! member disconnected\n");
316 : /* mark as dead */
317 2 : pool_free_member(pool, pmbr);
318 2 : return false;
319 : }
320 :
321 16956 : tdsdump_dump_buf(TDS_DBG_NETWORK, "Got packet from server:", tds->in_buf, tds->in_len);
322 16956 : puser = pmbr->current_user;
323 16956 : if (!puser)
324 : break;
325 :
326 16956 : tdsdump_log(TDS_DBG_INFO1, "writing it sock %d\n", (int) tds_get_s(puser->sock.tds));
327 16956 : if (!pool_write_data(&pmbr->sock, &puser->sock)) {
328 0 : tdsdump_log(TDS_DBG_ERROR, "member received error while writing\n");
329 0 : pool_free_user(pool, puser);
330 0 : return false;
331 : }
332 16956 : if (tds->in_pos < tds->in_len)
333 : /* partial write, schedule a future write */
334 : break;
335 : }
336 18414 : if (puser && !puser->sock.poll_send)
337 16808 : tds_socket_flush(tds_get_s(puser->sock.tds));
338 : return true;
339 : }
340 :
341 : /*
342 : * pool_process_members
343 : * check the fd_set for members returning data to the client, lookup the
344 : * client holding this member and forward the results.
345 : * @return Timeout you should call this function again or -1 for infinite
346 : */
347 : int
348 36421 : pool_process_members(TDS_POOL * pool, struct pollfd *fds, unsigned num_fds)
349 : {
350 : TDS_POOL_MEMBER *pmbr, *next;
351 : time_t age;
352 : time_t time_now;
353 36421 : int min_expire_left = -1;
354 : short revents;
355 :
356 36421 : pool_mbr_check(pool);
357 146629 : for (next = dlist_member_first(&pool->active_members); (pmbr = next) != NULL; ) {
358 73787 : bool processed = false;
359 :
360 147574 : next = dlist_member_next(&pool->active_members, pmbr);
361 :
362 73787 : assert(pmbr->current_user);
363 73787 : if (pmbr->doing_async || pmbr->sock.poll_index > num_fds)
364 1369 : continue;
365 :
366 72418 : revents = fds[pmbr->sock.poll_index].revents;
367 72418 : assert(pmbr->sock.tds);
368 :
369 72418 : time_now = time(NULL);
370 72418 : if (pmbr->sock.poll_recv && (revents & (POLLIN|POLLHUP)) != 0) {
371 18416 : if (!pool_process_data(pool, pmbr))
372 2 : continue;
373 : processed = true;
374 : }
375 72416 : if (pmbr->sock.poll_send && (revents & POLLOUT) != 0) {
376 0 : if (!pool_write_data(&pmbr->current_user->sock, &pmbr->sock)) {
377 0 : pool_free_member(pool, pmbr);
378 0 : continue;
379 : }
380 : processed = true;
381 : }
382 72416 : if (processed)
383 18414 : pmbr->last_used_tm = time_now;
384 : }
385 :
386 36421 : if (pool->num_active_members <= pool->min_open_conn)
387 : return min_expire_left;
388 :
389 : /* close old connections */
390 27393 : time_now = time(NULL);
391 266279 : for (next = dlist_member_first(&pool->idle_members); (pmbr = next) != NULL; ) {
392 :
393 422986 : next = dlist_member_next(&pool->idle_members, pmbr);
394 :
395 211493 : assert(pmbr->sock.tds);
396 211493 : assert(!pmbr->current_user);
397 :
398 211493 : age = time_now - pmbr->last_used_tm;
399 211493 : if (age >= pool->max_member_age) {
400 0 : tdsdump_log(TDS_DBG_INFO1, "member is %ld seconds old...closing\n", (long int) age);
401 0 : pool_free_member(pool, pmbr);
402 : } else {
403 211493 : int left = (int) (pool->max_member_age - age);
404 211493 : if (min_expire_left < 0 || left < min_expire_left)
405 22743 : min_expire_left = left;
406 : }
407 : }
408 : return min_expire_left;
409 : }
410 :
411 : static bool
412 : compatible_versions(const TDSSOCKET *tds, const TDS_POOL_USER *user)
413 : {
414 695 : if (tds->conn->tds_version != user->login->tds_version)
415 : return false;
416 : return true;
417 : }
418 :
419 : typedef struct {
420 : TDS_POOL_EVENT common;
421 : TDS_POOL *pool;
422 : TDS_POOL_MEMBER *pmbr;
423 : int tds_version;
424 : } CONNECT_EVENT;
425 :
426 : static void connect_execute_ok(TDS_POOL_EVENT *base_event);
427 : static void connect_execute_ko(TDS_POOL_EVENT *base_event);
428 :
429 23 : static TDS_THREAD_PROC_DECLARE(connect_proc, arg)
430 : {
431 23 : CONNECT_EVENT *ev = (CONNECT_EVENT *) arg;
432 23 : TDS_POOL_MEMBER *pmbr = ev->pmbr;
433 23 : TDS_POOL *pool = ev->pool;
434 :
435 : for (;;) {
436 23 : pmbr->sock.tds = pool_mbr_login(pool, ev->tds_version);
437 23 : if (!pmbr->sock.tds) {
438 0 : tdsdump_log(TDS_DBG_ERROR, "Error opening a new connection to server\n");
439 : break;
440 : }
441 23 : if (!IS_TDS71_PLUS(pmbr->sock.tds->conn)) {
442 0 : tdsdump_log(TDS_DBG_ERROR, "Protocol server version not supported\n");
443 : break;
444 : }
445 :
446 : /* if already attached to a user we can send login directly */
447 23 : if (pmbr->current_user)
448 23 : if (!pool_user_send_login_ack(pool, pmbr->current_user))
449 : break;
450 :
451 23 : pool_event_add(pool, &ev->common, connect_execute_ok);
452 23 : return TDS_THREAD_RESULT(0);
453 : }
454 :
455 : /* failure */
456 0 : pool_event_add(pool, &ev->common, connect_execute_ko);
457 0 : return TDS_THREAD_RESULT(0);
458 : }
459 :
460 : static void
461 0 : connect_execute_ko(TDS_POOL_EVENT *base_event)
462 : {
463 0 : CONNECT_EVENT *ev = (CONNECT_EVENT *) base_event;
464 :
465 0 : pool_free_member(ev->pool, ev->pmbr);
466 0 : }
467 :
468 : static void
469 23 : connect_execute_ok(TDS_POOL_EVENT *base_event)
470 : {
471 23 : CONNECT_EVENT *ev = (CONNECT_EVENT *) base_event;
472 23 : TDS_POOL_MEMBER *pmbr = ev->pmbr;
473 23 : TDS_POOL_USER *puser = pmbr->current_user;
474 :
475 23 : ev->pool->member_logins++;
476 23 : pmbr->doing_async = false;
477 :
478 23 : pmbr->last_used_tm = time(NULL);
479 :
480 23 : if (puser) {
481 23 : pmbr->sock.poll_recv = true;
482 23 : puser->sock.poll_recv = true;
483 :
484 23 : puser->user_state = TDS_SRV_QUERY;
485 : }
486 23 : }
487 :
488 : /*
489 : * pool_assign_idle_member
490 : * assign a member to the user specified
491 : */
492 : TDS_POOL_MEMBER *
493 718 : pool_assign_idle_member(TDS_POOL * pool, TDS_POOL_USER *puser)
494 : {
495 : TDS_POOL_MEMBER *pmbr;
496 : CONNECT_EVENT *ev;
497 :
498 718 : puser->sock.poll_recv = false;
499 718 : puser->sock.poll_send = false;
500 :
501 718 : pool_mbr_check(pool);
502 718 : DLIST_FOREACH(dlist_member, &pool->idle_members, pmbr) {
503 695 : assert(pmbr->current_user == NULL);
504 695 : assert(!pmbr->doing_async);
505 :
506 695 : assert(pmbr->sock.tds);
507 :
508 1390 : if (!compatible_versions(pmbr->sock.tds, puser))
509 0 : continue;
510 :
511 695 : pool_assign_member(pool, pmbr, puser);
512 :
513 : /*
514 : * make sure member wasn't idle more that the timeout
515 : * otherwise it'll send the query and close leaving a
516 : * hung client
517 : */
518 695 : pmbr->last_used_tm = time(NULL);
519 695 : pmbr->sock.poll_recv = false;
520 695 : pmbr->sock.poll_send = false;
521 :
522 695 : pool_user_finish_login(pool, puser);
523 695 : return pmbr;
524 : }
525 :
526 : /* if we can open a new connection open it */
527 23 : if (pool->num_active_members >= pool->max_open_conn) {
528 0 : fprintf(stderr, "No idle members left, increase \"max pool conn\"\n");
529 0 : return NULL;
530 : }
531 :
532 23 : pmbr = tds_new0(TDS_POOL_MEMBER, 1);
533 23 : if (!pmbr) {
534 0 : fprintf(stderr, "Out of memory\n");
535 0 : return NULL;
536 : }
537 :
538 23 : tdsdump_log(TDS_DBG_INFO1, "No open connections left, opening new member\n");
539 :
540 23 : ev = tds_new0(CONNECT_EVENT, 1);
541 23 : if (!ev) {
542 0 : free(pmbr);
543 0 : fprintf(stderr, "Out of memory\n");
544 0 : return NULL;
545 : }
546 23 : ev->pmbr = pmbr;
547 23 : ev->pool = pool;
548 23 : ev->tds_version = puser->login->tds_version;
549 :
550 23 : if (tds_thread_create_detached(connect_proc, ev) != 0) {
551 0 : free(pmbr);
552 0 : free(ev);
553 0 : fprintf(stderr, "error creating thread\n");
554 0 : return NULL;
555 : }
556 23 : pmbr->doing_async = true;
557 :
558 23 : pool_mbr_check(pool);
559 23 : pool->num_active_members++;
560 23 : dlist_member_append(&pool->idle_members, pmbr);
561 23 : pool_mbr_check(pool);
562 :
563 23 : pool_assign_member(pool, pmbr, puser);
564 23 : puser->sock.poll_send = false;
565 23 : puser->sock.poll_recv = false;
566 :
567 23 : return pmbr;
568 : }
569 :
570 : #if ENABLE_EXTRA_CHECKS
571 38650 : void pool_mbr_check(TDS_POOL *pool)
572 : {
573 : TDS_POOL_MEMBER *pmbr;
574 38650 : unsigned total = 0;
575 :
576 227542 : DLIST_FOREACH(dlist_member, &pool->active_members, pmbr) {
577 75121 : assert(pmbr->doing_async || pmbr->sock.tds);
578 75121 : assert(pmbr->current_user);
579 75121 : ++total;
580 : }
581 525370 : DLIST_FOREACH(dlist_member, &pool->idle_members, pmbr) {
582 224035 : assert(pmbr->doing_async || pmbr->sock.tds);
583 224035 : assert(!pmbr->current_user);
584 224035 : ++total;
585 : }
586 38650 : assert(pool->num_active_members == total);
587 38650 : }
588 : #endif
|