Line data Source code
1 : /* TDSPool - Connection pooling for TDS based databases
2 : * Copyright (C) 2001, 2002, 2003, 2004, 2005 Brian Bruns
3 : *
4 : * This program is free software; you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation; either version 2 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program; if not, write to the Free Software
16 : * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 : *
18 : */
19 :
20 : #include <config.h>
21 :
22 : #include <stdarg.h>
23 : #include <stdio.h>
24 : #include <assert.h>
25 :
26 : #if HAVE_STDLIB_H
27 : #include <stdlib.h>
28 : #endif /* HAVE_STDLIB_H */
29 :
30 : #if HAVE_STRING_H
31 : #include <string.h>
32 : #endif /* HAVE_STRING_H */
33 :
34 : #if HAVE_UNISTD_H
35 : #include <unistd.h>
36 : #endif /* HAVE_UNISTD_H */
37 :
38 : #if HAVE_SYS_PARAM_H
39 : #include <sys/param.h>
40 : #endif /* HAVE_SYS_PARAM_H */
41 :
42 : #if HAVE_SYS_SOCKET_H
43 : #include <sys/socket.h>
44 : #endif /* HAVE_SYS_SOCKET_H */
45 :
46 : #if HAVE_NETINET_IN_H
47 : #include <netinet/in.h>
48 : #endif /* HAVE_NETINET_IN_H */
49 :
50 : #if HAVE_ARPA_INET_H
51 : #include <arpa/inet.h>
52 : #endif /* HAVE_ARPA_INET_H */
53 :
54 : #include "pool.h"
55 : #include <freetds/utils/string.h>
56 :
57 : #ifndef MAXHOSTNAMELEN
58 : #define MAXHOSTNAMELEN 256
59 : #endif /* MAXHOSTNAMELEN */
60 :
61 : static void
62 24 : pool_mbr_free_socket(TDSSOCKET *tds)
63 : {
64 24 : if (tds) {
65 24 : TDSCONTEXT *ctx = (TDSCONTEXT *) tds->conn->tds_ctx;
66 :
67 24 : tds_free_socket(tds);
68 24 : tds_free_context(ctx);
69 : }
70 24 : }
71 :
72 : /*
73 : * pool_mbr_login open a single pool login, to be call at init time or
74 : * to reconnect.
75 : */
76 : static TDSSOCKET *
77 24 : pool_mbr_login(const TDS_POOL * pool, int tds_version)
78 : {
79 : TDSCONTEXT *context;
80 : TDSLOGIN *login;
81 : TDSSOCKET *tds;
82 : TDSLOGIN *connection;
83 : char hostname[MAXHOSTNAMELEN];
84 :
85 24 : login = tds_alloc_login(1);
86 24 : if (!login) {
87 0 : fprintf(stderr, "out of memory");
88 0 : return NULL;
89 : }
90 24 : if (gethostname(hostname, MAXHOSTNAMELEN) < 0)
91 0 : strlcpy(hostname, "tdspool", MAXHOSTNAMELEN);
92 24 : if (!tds_set_passwd(login, pool->server_password)
93 24 : || !tds_set_user(login, pool->server_user)
94 24 : || !tds_set_app(login, "tdspool")
95 24 : || !tds_set_host(login, hostname)
96 24 : || !tds_set_library(login, "TDS-Library")
97 24 : || !tds_set_server(login, pool->server)
98 24 : || !tds_set_client_charset(login, "iso_1")
99 24 : || !tds_set_language(login, "us_english")) {
100 0 : tds_free_login(login);
101 0 : return NULL;
102 : }
103 24 : if (tds_version > 0)
104 22 : login->tds_version = tds_version;
105 24 : if (pool->database && strlen(pool->database)) {
106 24 : if (!tds_dstr_copy(&login->database, pool->database)) {
107 0 : tds_free_login(login);
108 0 : return NULL;
109 : }
110 : }
111 24 : context = tds_alloc_context(NULL);
112 24 : if (!context) {
113 0 : fprintf(stderr, "Context cannot be null\n");
114 0 : return NULL;
115 : }
116 24 : tds = tds_alloc_socket(context, 512);
117 24 : if (!tds) {
118 0 : fprintf(stderr, "tds cannot be null\n");
119 0 : return NULL;
120 : }
121 24 : connection = tds_read_config_info(tds, login, context->locale);
122 24 : tds_free_login(login);
123 24 : if (!connection || TDS_FAILED(tds_connect_and_login(tds, connection))) {
124 0 : pool_mbr_free_socket(tds);
125 0 : tds_free_login(connection);
126 : /* what to do? */
127 0 : fprintf(stderr, "Could not open connection to server %s\n", pool->server);
128 0 : return NULL;
129 : }
130 24 : tds_free_login(connection);
131 :
132 24 : if (pool->database && strlen(pool->database)) {
133 24 : if (strcasecmp(tds->conn->env.database, pool->database) != 0) {
134 0 : fprintf(stderr, "changing database failed\n");
135 0 : return NULL;
136 : }
137 : }
138 :
139 : return tds;
140 : }
141 :
142 : void
143 712 : pool_assign_member(TDS_POOL *pool, TDS_POOL_MEMBER * pmbr, TDS_POOL_USER *puser)
144 : {
145 712 : assert(pmbr->current_user == NULL);
146 : if (pmbr->current_user) {
147 : pmbr->current_user->assigned_member = NULL;
148 : } else {
149 712 : dlist_member_remove(&pool->idle_members, pmbr);
150 712 : dlist_member_append(&pool->active_members, pmbr);
151 : }
152 712 : pmbr->current_user = puser;
153 712 : puser->assigned_member = pmbr;
154 712 : }
155 :
156 : void
157 712 : pool_deassign_member(TDS_POOL *pool, TDS_POOL_MEMBER * pmbr)
158 : {
159 712 : if (pmbr->current_user) {
160 712 : pmbr->current_user->assigned_member = NULL;
161 712 : pmbr->current_user = NULL;
162 712 : dlist_member_remove(&pool->active_members, pmbr);
163 712 : dlist_member_append(&pool->idle_members, pmbr);
164 : }
165 712 : pmbr->sock.poll_send = false;
166 712 : }
167 :
168 : /*
169 : * if a dead connection on the client side left this member in a questionable
170 : * state, let's bring in a correct one
171 : * We are not sure what the client did so we must try to clean as much as
172 : * possible.
173 : * Use pool_free_member if the state is really broken.
174 : */
175 : void
176 710 : pool_reset_member(TDS_POOL * pool, TDS_POOL_MEMBER * pmbr)
177 : {
178 : // FIXME not wait for server !!! asyncronous
179 710 : TDSSOCKET *tds = pmbr->sock.tds;
180 : TDS_POOL_USER *puser;
181 :
182 710 : puser = pmbr->current_user;
183 710 : if (puser) {
184 0 : pool_deassign_member(pool, pmbr);
185 0 : pool_free_user(pool, puser);
186 : }
187 :
188 : /* cancel whatever pending */
189 710 : tds_init_write_buf(tds);
190 710 : if (tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
191 : goto failure;
192 710 : tds->out_flag = TDS_CANCEL;
193 710 : if (TDS_FAILED(tds_flush_packet(tds)))
194 : goto failure;
195 710 : tds_set_state(tds, TDS_PENDING);
196 710 : tds->in_cancel = 2;
197 :
198 710 : if (TDS_FAILED(tds_process_cancel(tds)))
199 : goto failure;
200 :
201 710 : if (IS_TDS71_PLUS(tds->conn)) {
202 : /* this 0x9 final reset the state from mssql 2000 */
203 710 : if (tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
204 : goto failure;
205 710 : tds_start_query(tds, TDS_QUERY);
206 710 : tds_put_string(tds, "WHILE @@TRANCOUNT > 0 ROLLBACK SET TRANSACTION ISOLATION LEVEL READ COMMITTED", -1);
207 710 : tds_write_packet(tds, 0x9);
208 710 : tds_set_state(tds, TDS_PENDING);
209 :
210 710 : if (TDS_FAILED(tds_process_simple_query(tds)))
211 : goto failure;
212 : }
213 : return;
214 :
215 0 : failure:
216 0 : pool_free_member(pool, pmbr);
217 : }
218 :
219 : void
220 24 : pool_free_member(TDS_POOL * pool, TDS_POOL_MEMBER * pmbr)
221 : {
222 : TDSSOCKET *tds;
223 : TDS_POOL_USER *puser;
224 :
225 24 : tds = pmbr->sock.tds;
226 24 : if (tds) {
227 24 : if (!IS_TDSDEAD(tds))
228 24 : tds_close_socket(tds);
229 24 : pool_mbr_free_socket(tds);
230 24 : pmbr->sock.tds = NULL;
231 : }
232 :
233 : /*
234 : * if he is allocated disconnect the client
235 : * otherwise we end up with broken client.
236 : */
237 24 : puser = pmbr->current_user;
238 24 : if (puser) {
239 2 : pool_deassign_member(pool, pmbr);
240 2 : pool_free_user(pool, puser);
241 : }
242 :
243 48 : if (dlist_member_in_list(&pool->active_members, pmbr)) {
244 24 : pool->num_active_members--;
245 24 : dlist_member_remove(&pool->active_members, pmbr);
246 : }
247 24 : free(pmbr);
248 24 : }
249 :
250 : void
251 2 : pool_mbr_init(TDS_POOL * pool)
252 : {
253 : TDS_POOL_MEMBER *pmbr;
254 :
255 : /* allocate room for pool members */
256 :
257 2 : pool->num_active_members = 0;
258 4 : dlist_member_init(&pool->active_members);
259 2 : dlist_member_init(&pool->idle_members);
260 :
261 : /* open connections for each member */
262 4 : while (pool->num_active_members < pool->min_open_conn) {
263 2 : pmbr = tds_new0(TDS_POOL_MEMBER, 1);
264 2 : if (!pmbr) {
265 0 : fprintf(stderr, "Out of memory\n");
266 0 : exit(1);
267 : }
268 2 : pmbr->sock.poll_recv = true;
269 :
270 2 : pmbr->sock.tds = pool_mbr_login(pool, 0);
271 2 : if (!pmbr->sock.tds) {
272 0 : fprintf(stderr, "Could not open initial connection\n");
273 0 : exit(1);
274 : }
275 2 : pmbr->last_used_tm = time(NULL);
276 2 : pool->num_active_members++;
277 2 : dlist_member_append(&pool->idle_members, pmbr);
278 2 : if (!IS_TDS71_PLUS(pmbr->sock.tds->conn)) {
279 0 : fprintf(stderr, "Current pool implementation does not support protocol versions former than 7.1\n");
280 0 : exit(1);
281 : }
282 2 : pool->member_logins++;
283 : }
284 2 : }
285 :
286 : void
287 2 : pool_mbr_destroy(TDS_POOL * pool)
288 : {
289 4 : while (dlist_member_first(&pool->active_members))
290 0 : pool_free_member(pool, dlist_member_first(&pool->active_members));
291 46 : while (dlist_member_first(&pool->idle_members))
292 22 : pool_free_member(pool, dlist_member_first(&pool->idle_members));
293 :
294 2 : assert(pool->num_active_members == 0);
295 2 : pool->num_active_members = 0;
296 2 : }
297 :
298 : static bool
299 18039 : pool_process_data(TDS_POOL *pool, TDS_POOL_MEMBER *pmbr)
300 : {
301 18039 : TDSSOCKET *tds = pmbr->sock.tds;
302 18039 : TDS_POOL_USER *puser = NULL;
303 :
304 : for (;;) {
305 34780 : if (pool_packet_read(tds))
306 : break;
307 :
308 : /* disconnected */
309 16743 : if (tds->in_len == 0) {
310 2 : tdsdump_log(TDS_DBG_INFO1, "Uh oh! member disconnected\n");
311 : /* mark as dead */
312 2 : pool_free_member(pool, pmbr);
313 2 : return false;
314 : }
315 :
316 16741 : tdsdump_dump_buf(TDS_DBG_NETWORK, "Got packet from server:", tds->in_buf, tds->in_len);
317 16741 : puser = pmbr->current_user;
318 16741 : if (!puser)
319 : break;
320 :
321 16741 : tdsdump_log(TDS_DBG_INFO1, "writing it sock %d\n", tds_get_s(puser->sock.tds));
322 16741 : if (!pool_write_data(&pmbr->sock, &puser->sock)) {
323 0 : tdsdump_log(TDS_DBG_ERROR, "member received error while writing\n");
324 0 : pool_free_user(pool, puser);
325 0 : return false;
326 : }
327 16741 : if (tds->in_pos < tds->in_len)
328 : /* partial write, schedule a future write */
329 : break;
330 : }
331 18037 : if (puser && !puser->sock.poll_send)
332 16507 : tds_socket_flush(tds_get_s(puser->sock.tds));
333 : return true;
334 : }
335 :
336 : /*
337 : * pool_process_members
338 : * check the fd_set for members returning data to the client, lookup the
339 : * client holding this member and forward the results.
340 : * @return Timeout you should call this function again or -1 for infinite
341 : */
342 : int
343 35988 : pool_process_members(TDS_POOL * pool, struct pollfd *fds, unsigned num_fds)
344 : {
345 : TDS_POOL_MEMBER *pmbr, *next;
346 : TDSSOCKET *tds;
347 : time_t age;
348 : time_t time_now;
349 35988 : int min_expire_left = -1;
350 : short revents;
351 :
352 145649 : for (next = dlist_member_first(&pool->active_members); (pmbr = next) != NULL; ) {
353 73673 : bool processed = false;
354 :
355 147346 : next = dlist_member_next(&pool->active_members, pmbr);
356 :
357 73673 : if (pmbr->doing_async || pmbr->sock.poll_index > num_fds)
358 1360 : continue;
359 :
360 72313 : revents = fds[pmbr->sock.poll_index].revents;
361 72313 : tds = pmbr->sock.tds;
362 72313 : assert(tds);
363 :
364 72313 : time_now = time(NULL);
365 72313 : if (pmbr->sock.poll_recv && (revents & POLLIN) != 0) {
366 18039 : if (!pool_process_data(pool, pmbr))
367 2 : continue;
368 : processed = true;
369 : }
370 72311 : if (pmbr->sock.poll_send && (revents & POLLOUT) != 0) {
371 1 : if (!pool_write_data(&pmbr->current_user->sock, &pmbr->sock)) {
372 0 : pool_free_member(pool, pmbr);
373 0 : continue;
374 : }
375 : processed = true;
376 : }
377 72310 : if (processed)
378 18038 : pmbr->last_used_tm = time_now;
379 : }
380 :
381 35988 : if (pool->num_active_members <= pool->min_open_conn)
382 : return min_expire_left;
383 :
384 : /* close old connections */
385 27078 : time_now = time(NULL);
386 263637 : for (next = dlist_member_first(&pool->idle_members); (pmbr = next) != NULL; ) {
387 :
388 418962 : next = dlist_member_next(&pool->idle_members, pmbr);
389 :
390 209481 : assert(pmbr->sock.tds);
391 209481 : assert(!pmbr->current_user);
392 :
393 209481 : age = time_now - pmbr->last_used_tm;
394 209481 : if (age >= pool->max_member_age) {
395 0 : tdsdump_log(TDS_DBG_INFO1, "member is %ld seconds old...closing\n", (long int) age);
396 0 : pool_free_member(pool, pmbr);
397 : } else {
398 209481 : int left = (int) (pool->max_member_age - age);
399 209481 : if (min_expire_left < 0 || left < min_expire_left)
400 22375 : min_expire_left = left;
401 : }
402 : }
403 : return min_expire_left;
404 : }
405 :
406 : static bool
407 : compatible_versions(const TDSSOCKET *tds, const TDS_POOL_USER *user)
408 : {
409 690 : if (tds->conn->tds_version != user->login->tds_version)
410 : return false;
411 : return true;
412 : }
413 :
414 : typedef struct {
415 : TDS_POOL_EVENT common;
416 : TDS_POOL *pool;
417 : TDS_POOL_MEMBER *pmbr;
418 : int tds_version;
419 : } CONNECT_EVENT;
420 :
421 : static void connect_execute_ok(TDS_POOL_EVENT *base_event);
422 : static void connect_execute_ko(TDS_POOL_EVENT *base_event);
423 :
424 22 : static TDS_THREAD_PROC_DECLARE(connect_proc, arg)
425 : {
426 22 : CONNECT_EVENT *ev = (CONNECT_EVENT *) arg;
427 22 : TDS_POOL_MEMBER *pmbr = ev->pmbr;
428 22 : TDS_POOL *pool = ev->pool;
429 :
430 : for (;;) {
431 22 : pmbr->sock.tds = pool_mbr_login(pool, ev->tds_version);
432 22 : if (!pmbr->sock.tds) {
433 0 : tdsdump_log(TDS_DBG_ERROR, "Error opening a new connection to server\n");
434 : break;
435 : }
436 22 : if (!IS_TDS71_PLUS(pmbr->sock.tds->conn)) {
437 0 : tdsdump_log(TDS_DBG_ERROR, "Protocol server version not supported\n");
438 : break;
439 : }
440 :
441 : /* if already attached to a user we can send login directly */
442 22 : if (pmbr->current_user)
443 22 : if (!pool_user_send_login_ack(pool, pmbr->current_user))
444 : break;
445 :
446 22 : pool_event_add(pool, &ev->common, connect_execute_ok);
447 22 : return TDS_THREAD_RESULT(0);
448 : }
449 :
450 : /* failure */
451 0 : pool_event_add(pool, &ev->common, connect_execute_ko);
452 0 : return TDS_THREAD_RESULT(0);
453 : }
454 :
455 : static void
456 0 : connect_execute_ko(TDS_POOL_EVENT *base_event)
457 : {
458 0 : CONNECT_EVENT *ev = (CONNECT_EVENT *) base_event;
459 :
460 0 : pool_free_member(ev->pool, ev->pmbr);
461 0 : }
462 :
463 : static void
464 22 : connect_execute_ok(TDS_POOL_EVENT *base_event)
465 : {
466 22 : CONNECT_EVENT *ev = (CONNECT_EVENT *) base_event;
467 22 : TDS_POOL_MEMBER *pmbr = ev->pmbr;
468 22 : TDS_POOL_USER *puser = pmbr->current_user;
469 :
470 22 : ev->pool->member_logins++;
471 22 : pmbr->doing_async = false;
472 :
473 22 : pmbr->last_used_tm = time(NULL);
474 :
475 22 : if (puser) {
476 22 : pmbr->sock.poll_recv = true;
477 22 : puser->sock.poll_recv = true;
478 :
479 22 : puser->user_state = TDS_SRV_QUERY;
480 : }
481 22 : }
482 :
483 : /*
484 : * pool_assign_idle_member
485 : * assign a member to the user specified
486 : */
487 : TDS_POOL_MEMBER *
488 712 : pool_assign_idle_member(TDS_POOL * pool, TDS_POOL_USER *puser)
489 : {
490 : TDS_POOL_MEMBER *pmbr;
491 : CONNECT_EVENT *ev;
492 :
493 712 : puser->sock.poll_recv = false;
494 712 : puser->sock.poll_send = false;
495 :
496 712 : DLIST_FOREACH(dlist_member, &pool->idle_members, pmbr) {
497 690 : assert(pmbr->current_user == NULL);
498 690 : assert(!pmbr->doing_async);
499 :
500 690 : assert(pmbr->sock.tds);
501 :
502 1380 : if (!compatible_versions(pmbr->sock.tds, puser))
503 0 : continue;
504 :
505 690 : pool_assign_member(pool, pmbr, puser);
506 :
507 : /*
508 : * make sure member wasn't idle more that the timeout
509 : * otherwise it'll send the query and close leaving a
510 : * hung client
511 : */
512 690 : pmbr->last_used_tm = time(NULL);
513 690 : pmbr->sock.poll_recv = false;
514 690 : pmbr->sock.poll_send = false;
515 :
516 690 : pool_user_finish_login(pool, puser);
517 690 : return pmbr;
518 : }
519 :
520 : /* if we can open a new connection open it */
521 22 : if (pool->num_active_members >= pool->max_open_conn) {
522 0 : fprintf(stderr, "No idle members left, increase \"max pool conn\"\n");
523 0 : return NULL;
524 : }
525 :
526 22 : pmbr = tds_new0(TDS_POOL_MEMBER, 1);
527 22 : if (!pmbr) {
528 0 : fprintf(stderr, "Out of memory\n");
529 0 : return NULL;
530 : }
531 :
532 22 : tdsdump_log(TDS_DBG_INFO1, "No open connections left, opening new member\n");
533 :
534 22 : ev = tds_new0(CONNECT_EVENT, 1);
535 22 : if (!ev) {
536 0 : free(pmbr);
537 0 : fprintf(stderr, "Out of memory\n");
538 0 : return NULL;
539 : }
540 22 : ev->pmbr = pmbr;
541 22 : ev->pool = pool;
542 22 : ev->tds_version = puser->login->tds_version;
543 :
544 22 : if (tds_thread_create_detached(connect_proc, ev) != 0) {
545 0 : free(pmbr);
546 0 : free(ev);
547 0 : fprintf(stderr, "error creating thread\n");
548 0 : return NULL;
549 : }
550 22 : pmbr->doing_async = true;
551 :
552 22 : pool->num_active_members++;
553 22 : dlist_member_append(&pool->idle_members, pmbr);
554 :
555 22 : pool_assign_member(pool, pmbr, puser);
556 22 : puser->sock.poll_send = false;
557 22 : puser->sock.poll_recv = false;
558 :
559 22 : return pmbr;
560 : }
|