Line data Source code
1 : /* TDSPool - Connection pooling for TDS based databases
2 : * Copyright (C) 2001, 2002, 2003, 2004, 2005 Brian Bruns
3 : *
4 : * This program is free software; you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation; either version 2 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program; if not, write to the Free Software
16 : * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 : *
18 : */
19 :
20 : #include <config.h>
21 :
22 : #include <stdarg.h>
23 : #include <stdio.h>
24 : #include <errno.h>
25 : #include <assert.h>
26 :
27 : #if HAVE_STDLIB_H
28 : #include <stdlib.h>
29 : #endif /* HAVE_STDLIB_H */
30 :
31 : #if HAVE_STRING_H
32 : #include <string.h>
33 : #endif /* HAVE_STRING_H */
34 :
35 : #if HAVE_UNISTD_H
36 : #include <unistd.h>
37 : #endif /* HAVE_UNISTD_H */
38 :
39 : #if HAVE_SYS_TYPES_H
40 : #include <sys/types.h>
41 : #endif /* HAVE_SYS_TYPES_H */
42 :
43 : #if HAVE_SYS_SOCKET_H
44 : #include <sys/socket.h>
45 : #endif /* HAVE_SYS_SOCKET_H */
46 :
47 : #include "pool.h"
48 : #include <freetds/server.h>
49 : #include <freetds/utils/string.h>
50 :
51 : static TDS_POOL_USER *pool_user_find_new(TDS_POOL * pool);
52 : static bool pool_user_login(TDS_POOL * pool, TDS_POOL_USER * puser);
53 : static bool pool_user_read(TDS_POOL * pool, TDS_POOL_USER * puser);
54 : static void login_execute(TDS_POOL_EVENT *base_event);
55 : static void end_login_execute(TDS_POOL_EVENT *base_event);
56 :
57 : void
58 2 : pool_user_init(TDS_POOL * pool)
59 : {
60 4 : dlist_user_init(&pool->users);
61 4 : dlist_user_init(&pool->waiters);
62 2 : pool->ctx = tds_alloc_context(NULL);
63 2 : }
64 :
65 : void
66 2 : pool_user_destroy(TDS_POOL * pool)
67 : {
68 4 : while (dlist_user_first(&pool->users))
69 0 : pool_free_user(pool, dlist_user_first(&pool->users));
70 2 : while (dlist_user_first(&pool->waiters))
71 0 : pool_free_user(pool, dlist_user_first(&pool->waiters));
72 :
73 2 : tds_free_context(pool->ctx);
74 2 : pool->ctx = NULL;
75 2 : }
76 :
77 : static TDS_POOL_USER *
78 722 : pool_user_find_new(TDS_POOL * pool)
79 : {
80 : TDS_POOL_USER *puser;
81 :
82 : /* did we exhaust the number of concurrent users? */
83 722 : if (pool->num_users >= MAX_POOL_USERS) {
84 0 : fprintf(stderr, "Max concurrent users exceeded, increase in pool.h\n");
85 0 : return NULL;
86 : }
87 :
88 722 : puser = tds_new0(TDS_POOL_USER, 1);
89 722 : if (!puser) {
90 0 : fprintf(stderr, "Out of memory\n");
91 0 : return NULL;
92 : }
93 :
94 722 : dlist_user_append(&pool->users, puser);
95 722 : pool->num_users++;
96 :
97 722 : return puser;
98 : }
99 :
100 : typedef struct {
101 : TDS_POOL_EVENT common;
102 : TDS_POOL *pool;
103 : TDS_POOL_USER *puser;
104 : bool success;
105 : } LOGIN_EVENT;
106 :
107 722 : static TDS_THREAD_PROC_DECLARE(login_proc, arg)
108 : {
109 722 : LOGIN_EVENT *ev = (LOGIN_EVENT *) arg;
110 :
111 722 : ev->success = pool_user_login(ev->pool, ev->puser);
112 :
113 722 : pool_event_add(ev->pool, &ev->common, login_execute);
114 722 : return TDS_THREAD_RESULT(0);
115 : }
116 :
117 : static void
118 722 : login_execute(TDS_POOL_EVENT *base_event)
119 : {
120 722 : LOGIN_EVENT *ev = (LOGIN_EVENT *) base_event;
121 722 : TDS_POOL_USER *puser = ev->puser;
122 722 : TDS_POOL *pool = ev->pool;
123 :
124 722 : if (!ev->success) {
125 : /* login failed...free socket */
126 2 : pool_free_user(pool, puser);
127 2 : return;
128 : }
129 :
130 720 : puser->sock.poll_recv = true;
131 :
132 : /* try to assign a member, connection can have transactions
133 : * and so on so deassign only when disconnected */
134 720 : pool_user_query(pool, puser);
135 :
136 720 : tdsdump_log(TDS_DBG_INFO1, "user state %d\n", puser->user_state);
137 :
138 720 : assert(puser->login || puser->user_state == TDS_SRV_QUERY);
139 : }
140 :
141 :
142 : /*
143 : * pool_user_create
144 : * accepts a client connection and adds it to the users list and returns it
145 : */
146 : TDS_POOL_USER *
147 722 : pool_user_create(TDS_POOL * pool, TDS_SYS_SOCKET s)
148 : {
149 : TDS_POOL_USER *puser;
150 : TDS_SYS_SOCKET fd;
151 : TDSSOCKET *tds;
152 : LOGIN_EVENT *ev;
153 :
154 722 : tdsdump_log(TDS_DBG_NETWORK, "accepting connection\n");
155 722 : if (TDS_IS_SOCKET_INVALID(fd = tds_accept(s, NULL, NULL))) {
156 0 : char *errstr = sock_strerror(errno);
157 0 : tdsdump_log(TDS_DBG_ERROR, "error calling assert :%s\n", errstr);
158 : sock_strerror_free(errstr);
159 : return NULL;
160 : }
161 :
162 722 : tds_socket_set_nodelay(fd);
163 :
164 722 : if (tds_socket_set_nonblocking(fd) != 0) {
165 0 : CLOSESOCKET(fd);
166 0 : return NULL;
167 : }
168 :
169 722 : puser = pool_user_find_new(pool);
170 722 : if (!puser) {
171 0 : CLOSESOCKET(fd);
172 0 : return NULL;
173 : }
174 :
175 722 : tds = tds_alloc_socket(pool->ctx, BLOCKSIZ);
176 722 : if (!tds) {
177 0 : CLOSESOCKET(fd);
178 0 : return NULL;
179 : }
180 722 : ev = tds_new0(LOGIN_EVENT, 1);
181 722 : if (!ev || TDS_FAILED(tds_iconv_open(tds->conn, "UTF-8", 0))) {
182 0 : free(ev);
183 0 : tds_free_socket(tds);
184 0 : CLOSESOCKET(fd);
185 0 : return NULL;
186 : }
187 722 : tds_set_s(tds, fd);
188 722 : tds->state = TDS_IDLE;
189 722 : tds->out_flag = TDS_LOGIN;
190 :
191 722 : puser->sock.tds = tds;
192 722 : puser->user_state = TDS_SRV_QUERY;
193 722 : puser->sock.poll_recv = false;
194 722 : puser->sock.poll_send = false;
195 :
196 : /* launch login asyncronously */
197 722 : ev->puser = puser;
198 722 : ev->pool = pool;
199 :
200 722 : if (tds_thread_create_detached(login_proc, ev) != 0) {
201 0 : pool_free_user(pool, puser);
202 0 : fprintf(stderr, "error creating thread\n");
203 0 : return NULL;
204 : }
205 :
206 : return puser;
207 : }
208 :
209 : /*
210 : * pool_free_user
211 : * close out a disconnected user.
212 : */
213 : void
214 722 : pool_free_user(TDS_POOL *pool, TDS_POOL_USER * puser)
215 : {
216 722 : TDS_POOL_MEMBER *pmbr = puser->assigned_member;
217 722 : if (pmbr) {
218 718 : assert(pmbr->current_user == puser);
219 718 : pool_deassign_member(pool, pmbr);
220 718 : pool_reset_member(pool, pmbr);
221 : }
222 :
223 722 : tds_free_socket(puser->sock.tds);
224 722 : tds_free_login(puser->login);
225 :
226 : /* make sure to decrement the waiters list if he is waiting */
227 722 : if (puser->user_state == TDS_SRV_WAIT)
228 0 : dlist_user_remove(&pool->waiters, puser);
229 : else
230 722 : dlist_user_remove(&pool->users, puser);
231 722 : pool->num_users--;
232 722 : free(puser);
233 722 : }
234 :
235 : /*
236 : * pool_process_users
237 : * check the fd_set for user input, allocate a pool member to it, and forward
238 : * the query to that member.
239 : */
240 : void
241 36636 : pool_process_users(TDS_POOL * pool, struct pollfd *fds, unsigned num_fds)
242 : {
243 : TDS_POOL_USER *puser, *next;
244 : short revents;
245 :
246 149434 : for (next = dlist_user_first(&pool->users); (puser = next) != NULL; ) {
247 :
248 152324 : next = dlist_user_next(&pool->users, puser);
249 :
250 76162 : if (!puser->sock.tds || puser->sock.poll_index >= num_fds)
251 0 : continue; /* dead connection */
252 :
253 76162 : revents = fds[puser->sock.poll_index].revents;
254 :
255 76162 : if (puser->sock.poll_recv && (revents & (POLLIN|POLLHUP)) != 0) {
256 16249 : assert(puser->user_state == TDS_SRV_QUERY);
257 16249 : if (!pool_user_read(pool, puser))
258 718 : continue;
259 : }
260 75444 : if (puser->sock.poll_send && (revents & POLLOUT) != 0) {
261 0 : if (!pool_write_data(&puser->assigned_member->sock, &puser->sock))
262 0 : pool_free_member(pool, puser->assigned_member);
263 : }
264 : } /* for */
265 36636 : }
266 :
267 : /*
268 : * pool_user_login
269 : * Reads clients login packet and forges a login acknowledgement sequence
270 : */
271 : static bool
272 722 : pool_user_login(TDS_POOL * pool, TDS_POOL_USER * puser)
273 : {
274 : TDSSOCKET *tds;
275 : TDSLOGIN *login;
276 :
277 722 : tds = puser->sock.tds;
278 1444 : while (tds->in_len <= tds->in_pos)
279 722 : if (tds_read_packet(tds) < 0)
280 : return false;
281 :
282 722 : tdsdump_log(TDS_DBG_NETWORK, "got packet type %d\n", tds->in_flag);
283 722 : if (tds->in_flag == TDS71_PRELOGIN) {
284 722 : if (!tds->conn->tds_version)
285 722 : tds->conn->tds_version = 0x701;
286 722 : tds->out_flag = TDS_REPLY;
287 : // TODO proper one !!
288 : // TODO detect TDS version here ??
289 722 : tds_put_n(tds, "\x00\x00\x1a\x00\x06" /* version */
290 : "\x01\x00\x20\x00\x01" /* encryption */
291 : "\x02\x00\x21\x00\x01" /* instance ?? */
292 : "\x03\x00\x22\x00\x00" /* process id ?? */
293 : "\x04\x00\x22\x00\x01" /* MARS */
294 : "\xff"
295 : "\x0a\x00\x06\x40\x00\x00"
296 : "\x02"
297 : "\x00"
298 : ""
299 : "\x00", 0x23);
300 722 : tds_flush_packet(tds);
301 :
302 : /* read another packet */
303 722 : tds->in_pos = tds->in_len;
304 1444 : while (tds->in_len <= tds->in_pos)
305 722 : if (tds_read_packet(tds) < 0)
306 : return false;
307 : }
308 :
309 722 : puser->login = login = tds_alloc_login(true);
310 722 : if (!login) {
311 0 : tdsdump_log(TDS_DBG_ERROR, "tds_alloc_login() failed.\n");
312 : return false;
313 : }
314 722 : if (tds->in_flag == TDS_LOGIN) {
315 0 : if (!tds->conn->tds_version)
316 0 : tds->conn->tds_version = 0x500;
317 0 : tds_read_login(tds, login);
318 722 : } else if (tds->in_flag == TDS7_LOGIN) {
319 722 : if (!tds->conn->tds_version)
320 0 : tds->conn->tds_version = 0x700;
321 722 : if (!tds7_read_login(tds, login))
322 : return false;
323 : } else {
324 : return false;
325 : }
326 :
327 : /* check we support version required */
328 : // TODO function to check it
329 722 : if (!IS_TDS71_PLUS(login))
330 : return false;
331 :
332 722 : tds->in_len = tds->in_pos = 0;
333 :
334 722 : dump_login(login);
335 1444 : if (strcmp(tds_dstr_cstr(&login->user_name), pool->user) != 0
336 1444 : || strcmp(tds_dstr_cstr(&login->password), pool->password) != 0)
337 : /* TODO send nack before exiting */
338 : return false;
339 :
340 : return true;
341 : }
342 :
343 : bool
344 720 : pool_user_send_login_ack(TDS_POOL * pool, TDS_POOL_USER * puser)
345 : {
346 : char msg[256];
347 : char block[32];
348 720 : TDSSOCKET *tds = puser->sock.tds, *mtds = puser->assigned_member->sock.tds;
349 720 : TDSLOGIN *login = puser->login;
350 : const char *database;
351 720 : const char *server = mtds->conn->server ? mtds->conn->server : "JDBC";
352 : bool dbname_mismatch, odbc_mismatch;
353 :
354 720 : pool->user_logins++;
355 :
356 : /* copy a bit of information, resize socket with block */
357 720 : tds->conn->tds_version = mtds->conn->tds_version;
358 720 : tds->conn->product_version = mtds->conn->product_version;
359 720 : memcpy(tds->conn->collation, mtds->conn->collation, sizeof(tds->conn->collation));
360 720 : tds->conn->tds71rev1 = mtds->conn->tds71rev1;
361 720 : free(tds->conn->product_name);
362 720 : tds->conn->product_name = strdup(mtds->conn->product_name);
363 720 : tds_realloc_socket(tds, mtds->conn->env.block_size);
364 720 : tds->conn->env.block_size = mtds->conn->env.block_size;
365 720 : tds->conn->client_spid = mtds->conn->spid;
366 :
367 : /* if database is different use USE statement */
368 720 : database = pool->database;
369 1440 : dbname_mismatch = !tds_dstr_isempty(&login->database)
370 978 : && strcasecmp(tds_dstr_cstr(&login->database), database) != 0;
371 720 : odbc_mismatch = (login->option_flag2 & TDS_ODBC_ON) == 0;
372 720 : if (dbname_mismatch || odbc_mismatch) {
373 : char *str;
374 352 : int len = 128 + tds_quote_id(mtds, NULL, tds_dstr_cstr(&login->database),-1);
375 : TDSRET ret;
376 :
377 176 : if ((str = tds_new(char, len)) == NULL)
378 : return false;
379 :
380 176 : str[0] = 0;
381 : /* swicth to dblib options */
382 176 : if (odbc_mismatch)
383 174 : strcat(str, "SET ANSI_DEFAULTS OFF\nSET CONCAT_NULL_YIELDS_NULL OFF\n");
384 176 : if (dbname_mismatch) {
385 2 : strcat(str, "USE ");
386 4 : tds_quote_id(mtds, strchr(str, 0), tds_dstr_cstr(&login->database), -1);
387 : }
388 176 : ret = tds_submit_query(mtds, str);
389 176 : free(str);
390 176 : if (TDS_FAILED(ret) || TDS_FAILED(tds_process_simple_query(mtds)))
391 : return false;
392 176 : if (dbname_mismatch)
393 4 : database = tds_dstr_cstr(&login->database);
394 : else
395 174 : database = mtds->conn->env.database;
396 : }
397 :
398 : // 7.0
399 : // env database
400 : // database change message (with server name correct)
401 : // env language
402 : // language change message
403 : // env 0x3 charset ("iso_1")
404 : // env 0x5 lcid ("1033")
405 : // env 0x6 ("196609" ?? 0x30001)
406 : // loginack
407 : // env 0x4 packet size
408 : // done
409 : //
410 : // 7.1/7.2/7.3
411 : // env database
412 : // database change message (with server name correct)
413 : // env 0x7 collation
414 : // env language
415 : // language change message
416 : // loginack
417 : // env 0x4 packet size
418 : // done
419 720 : tds->out_flag = TDS_REPLY;
420 720 : tds_env_change(tds, TDS_ENV_DATABASE, "master", database);
421 720 : sprintf(msg, "Changed database context to '%s'.", database);
422 720 : tds_send_msg(tds, 5701, 2, 0, msg, server, NULL, 1);
423 720 : if (!login->suppress_language) {
424 720 : tds_env_change(tds, TDS_ENV_LANG, NULL, "us_english");
425 720 : tds_send_msg(tds, 5703, 1, 0, "Changed language setting to 'us_english'.", server, NULL, 1);
426 : }
427 :
428 720 : if (IS_TDS71_PLUS(tds->conn)) {
429 720 : tds_put_byte(tds, TDS_ENVCHANGE_TOKEN);
430 720 : tds_put_smallint(tds, 8);
431 720 : tds_put_byte(tds, TDS_ENV_SQLCOLLATION);
432 720 : tds_put_byte(tds, 5);
433 720 : tds_put_n(tds, tds->conn->collation, 5);
434 720 : tds_put_byte(tds, 0);
435 : }
436 :
437 720 : tds_send_login_ack(tds, mtds->conn->product_name);
438 720 : sprintf(block, "%d", tds->conn->env.block_size);
439 720 : tds_env_change(tds, TDS_ENV_PACKSIZE, block, block);
440 : /* tds_send_capabilities_token(tds); */
441 720 : tds_send_done_token(tds, TDS_DONE_FINAL, 0);
442 :
443 : /* send it! */
444 720 : tds_flush_packet(tds);
445 :
446 720 : tds_free_login(login);
447 720 : puser->login = NULL;
448 720 : return true;
449 : }
450 :
451 : /*
452 : * pool_user_read
453 : * checks the packet type of data coming from the client and allocates a
454 : * pool member if necessary.
455 : */
456 : static bool
457 16249 : pool_user_read(TDS_POOL * pool, TDS_POOL_USER * puser)
458 : {
459 16249 : TDSSOCKET *tds = puser->sock.tds;
460 16249 : TDS_POOL_MEMBER *pmbr = NULL;
461 :
462 : for (;;) {
463 : TDS_UCHAR in_flag;
464 :
465 32209 : if (pool_packet_read(tds))
466 : break;
467 16679 : if (tds->in_len == 0) {
468 718 : tdsdump_log(TDS_DBG_INFO1, "user disconnected\n");
469 718 : pool_free_user(pool, puser);
470 718 : return false;
471 : }
472 :
473 15961 : tdsdump_dump_buf(TDS_DBG_NETWORK, "Got packet from client:", tds->in_buf, tds->in_len);
474 :
475 15961 : in_flag = tds->in_buf[0];
476 : switch (in_flag) {
477 15961 : case TDS_QUERY:
478 : case TDS_NORMAL:
479 : case TDS_RPC:
480 : case TDS_BULK:
481 : case TDS_CANCEL:
482 : case TDS7_TRANS:
483 15961 : if (!pool_write_data(&puser->sock, &puser->assigned_member->sock)) {
484 0 : pool_reset_member(pool, puser->assigned_member);
485 0 : return false;
486 : }
487 15961 : pmbr = puser->assigned_member;
488 : break;
489 :
490 0 : default:
491 0 : tdsdump_log(TDS_DBG_ERROR, "Unrecognized packet type, closing user\n");
492 0 : pool_free_user(pool, puser);
493 0 : return false;
494 : }
495 15961 : if (tds->in_pos < tds->in_len)
496 : /* partial write, schedule a future write */
497 : break;
498 : }
499 15531 : if (pmbr && !pmbr->sock.poll_send)
500 15528 : tds_socket_flush(tds_get_s(pmbr->sock.tds));
501 : return true;
502 : }
503 :
504 : void
505 720 : pool_user_query(TDS_POOL * pool, TDS_POOL_USER * puser)
506 : {
507 : TDS_POOL_MEMBER *pmbr;
508 :
509 720 : tdsdump_log(TDS_DBG_FUNC, "pool_user_query\n");
510 :
511 720 : assert(puser->assigned_member == NULL);
512 720 : assert(puser->login);
513 :
514 720 : puser->user_state = TDS_SRV_QUERY;
515 720 : pmbr = pool_assign_idle_member(pool, puser);
516 720 : if (!pmbr) {
517 : /*
518 : * put into wait state
519 : * check when member is deallocated
520 : */
521 0 : tdsdump_log(TDS_DBG_INFO1, "Not enough free members...placing user in WAIT\n");
522 0 : puser->user_state = TDS_SRV_WAIT;
523 0 : puser->sock.poll_recv = false;
524 0 : puser->sock.poll_send = false;
525 0 : dlist_user_remove(&pool->users, puser);
526 0 : dlist_user_append(&pool->waiters, puser);
527 : }
528 720 : }
529 :
530 : typedef struct {
531 : TDS_POOL_EVENT common;
532 : TDS_POOL *pool;
533 : TDS_POOL_USER *puser;
534 : bool success;
535 : } END_LOGIN_EVENT;
536 :
537 698 : static TDS_THREAD_PROC_DECLARE(end_login_proc, arg)
538 : {
539 698 : END_LOGIN_EVENT *ev = (END_LOGIN_EVENT *) arg;
540 698 : TDS_POOL *pool = ev->pool;
541 :
542 698 : ev->success = pool_user_send_login_ack(pool, ev->puser);
543 :
544 698 : pool_event_add(pool, &ev->common, end_login_execute);
545 698 : return TDS_THREAD_RESULT(0);
546 : }
547 :
548 : static void
549 698 : end_login_execute(TDS_POOL_EVENT *base_event)
550 : {
551 698 : END_LOGIN_EVENT *ev = (END_LOGIN_EVENT *) base_event;
552 698 : TDS_POOL *pool = ev->pool;
553 698 : TDS_POOL_USER *puser = ev->puser;
554 698 : TDS_POOL_MEMBER *pmbr = puser->assigned_member;
555 :
556 698 : if (!ev->success) {
557 0 : pool_free_member(pool, pmbr);
558 0 : return;
559 : }
560 :
561 698 : puser->sock.poll_recv = true;
562 698 : puser->sock.poll_send = false;
563 698 : pmbr->sock.poll_recv = true;
564 698 : pmbr->sock.poll_send = false;
565 : }
566 :
567 : /**
568 : * Handle async login
569 : */
570 : void
571 698 : pool_user_finish_login(TDS_POOL * pool, TDS_POOL_USER * puser)
572 : {
573 698 : END_LOGIN_EVENT *ev = tds_new0(END_LOGIN_EVENT, 1);
574 698 : if (!ev) {
575 0 : pool_free_member(pool, puser->assigned_member);
576 0 : return;
577 : }
578 :
579 698 : ev->pool = pool;
580 698 : ev->puser = puser;
581 :
582 698 : if (tds_thread_create_detached(end_login_proc, ev) != 0) {
583 0 : pool_free_member(pool, puser->assigned_member);
584 0 : free(ev);
585 0 : fprintf(stderr, "error creating thread\n");
586 : }
587 : }
|