Line data Source code
1 : /* TDSPool - Connection pooling for TDS based databases
2 : * Copyright (C) 2001, 2002, 2003, 2004, 2005 Brian Bruns
3 : *
4 : * This program is free software; you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation; either version 2 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program; if not, write to the Free Software
16 : * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 : *
18 : */
19 :
20 : #include <config.h>
21 :
22 : #include <stdarg.h>
23 : #include <stdio.h>
24 : #include <errno.h>
25 : #include <assert.h>
26 :
27 : #if HAVE_STDLIB_H
28 : #include <stdlib.h>
29 : #endif /* HAVE_STDLIB_H */
30 :
31 : #if HAVE_STRING_H
32 : #include <string.h>
33 : #endif /* HAVE_STRING_H */
34 :
35 : #if HAVE_UNISTD_H
36 : #include <unistd.h>
37 : #endif /* HAVE_UNISTD_H */
38 :
39 : #if HAVE_SYS_TYPES_H
40 : #include <sys/types.h>
41 : #endif /* HAVE_SYS_TYPES_H */
42 :
43 : #if HAVE_SYS_SOCKET_H
44 : #include <sys/socket.h>
45 : #endif /* HAVE_SYS_SOCKET_H */
46 :
47 : #include "pool.h"
48 : #include <freetds/server.h>
49 : #include <freetds/utils/string.h>
50 :
51 : static TDS_POOL_USER *pool_user_find_new(TDS_POOL * pool);
52 : static bool pool_user_login(TDS_POOL * pool, TDS_POOL_USER * puser);
53 : static bool pool_user_read(TDS_POOL * pool, TDS_POOL_USER * puser);
54 : static void login_execute(TDS_POOL_EVENT *base_event);
55 : static void end_login_execute(TDS_POOL_EVENT *base_event);
56 :
57 : void
58 2 : pool_user_init(TDS_POOL * pool)
59 : {
60 4 : dlist_user_init(&pool->users);
61 4 : dlist_user_init(&pool->waiters);
62 2 : pool->ctx = tds_alloc_context(NULL);
63 2 : }
64 :
65 : void
66 2 : pool_user_destroy(TDS_POOL * pool)
67 : {
68 4 : while (dlist_user_first(&pool->users))
69 0 : pool_free_user(pool, dlist_user_first(&pool->users));
70 2 : while (dlist_user_first(&pool->waiters))
71 0 : pool_free_user(pool, dlist_user_first(&pool->waiters));
72 :
73 2 : tds_free_context(pool->ctx);
74 2 : pool->ctx = NULL;
75 2 : }
76 :
77 : static TDS_POOL_USER *
78 720 : pool_user_find_new(TDS_POOL * pool)
79 : {
80 : TDS_POOL_USER *puser;
81 :
82 : /* did we exhaust the number of concurrent users? */
83 720 : if (pool->num_users >= MAX_POOL_USERS) {
84 0 : fprintf(stderr, "Max concurrent users exceeded, increase in pool.h\n");
85 0 : return NULL;
86 : }
87 :
88 720 : puser = tds_new0(TDS_POOL_USER, 1);
89 720 : if (!puser) {
90 0 : fprintf(stderr, "Out of memory\n");
91 0 : return NULL;
92 : }
93 :
94 720 : dlist_user_append(&pool->users, puser);
95 720 : pool->num_users++;
96 :
97 720 : return puser;
98 : }
99 :
100 : typedef struct {
101 : TDS_POOL_EVENT common;
102 : TDS_POOL *pool;
103 : TDS_POOL_USER *puser;
104 : bool success;
105 : } LOGIN_EVENT;
106 :
107 720 : static TDS_THREAD_PROC_DECLARE(login_proc, arg)
108 : {
109 720 : LOGIN_EVENT *ev = (LOGIN_EVENT *) arg;
110 :
111 720 : ev->success = pool_user_login(ev->pool, ev->puser);
112 :
113 720 : pool_event_add(ev->pool, &ev->common, login_execute);
114 720 : return TDS_THREAD_RESULT(0);
115 : }
116 :
117 : static void
118 720 : login_execute(TDS_POOL_EVENT *base_event)
119 : {
120 720 : LOGIN_EVENT *ev = (LOGIN_EVENT *) base_event;
121 720 : TDS_POOL_USER *puser = ev->puser;
122 720 : TDS_POOL *pool = ev->pool;
123 :
124 720 : if (!ev->success) {
125 : /* login failed...free socket */
126 2 : pool_free_user(pool, puser);
127 2 : return;
128 : }
129 :
130 718 : puser->sock.poll_recv = true;
131 :
132 : /* try to assign a member, connection can have transactions
133 : * and so on so deassign only when disconnected */
134 718 : pool_user_query(pool, puser);
135 :
136 718 : tdsdump_log(TDS_DBG_INFO1, "user state %d\n", puser->user_state);
137 :
138 718 : assert(puser->login || puser->user_state == TDS_SRV_QUERY);
139 : }
140 :
141 :
142 : /*
143 : * pool_user_create
144 : * accepts a client connection and adds it to the users list and returns it
145 : */
146 : TDS_POOL_USER *
147 720 : pool_user_create(TDS_POOL * pool, TDS_SYS_SOCKET s)
148 : {
149 : TDS_POOL_USER *puser;
150 : TDS_SYS_SOCKET fd;
151 : TDSSOCKET *tds;
152 : LOGIN_EVENT *ev;
153 :
154 720 : tdsdump_log(TDS_DBG_NETWORK, "accepting connection\n");
155 720 : if (TDS_IS_SOCKET_INVALID(fd = tds_accept(s, NULL, NULL))) {
156 0 : char *errstr = sock_strerror(errno);
157 0 : tdsdump_log(TDS_DBG_ERROR, "error calling assert :%s\n", errstr);
158 : sock_strerror_free(errstr);
159 : return NULL;
160 : }
161 :
162 720 : tds_socket_set_nodelay(fd);
163 :
164 720 : if (tds_socket_set_nonblocking(fd) != 0) {
165 0 : CLOSESOCKET(fd);
166 0 : return NULL;
167 : }
168 :
169 720 : puser = pool_user_find_new(pool);
170 720 : if (!puser) {
171 0 : CLOSESOCKET(fd);
172 0 : return NULL;
173 : }
174 :
175 720 : tds = tds_alloc_socket(pool->ctx, BLOCKSIZ);
176 720 : if (!tds) {
177 0 : CLOSESOCKET(fd);
178 0 : return NULL;
179 : }
180 720 : ev = tds_new0(LOGIN_EVENT, 1);
181 720 : if (!ev || TDS_FAILED(tds_iconv_open(tds->conn, "UTF-8", 0))) {
182 0 : free(ev);
183 0 : tds_free_socket(tds);
184 0 : CLOSESOCKET(fd);
185 0 : return NULL;
186 : }
187 720 : tds_set_s(tds, fd);
188 720 : tds->state = TDS_IDLE;
189 720 : tds->out_flag = TDS_LOGIN;
190 :
191 720 : puser->sock.tds = tds;
192 720 : puser->user_state = TDS_SRV_QUERY;
193 720 : puser->sock.poll_recv = false;
194 720 : puser->sock.poll_send = false;
195 :
196 : /* launch login asyncronously */
197 720 : ev->puser = puser;
198 720 : ev->pool = pool;
199 :
200 720 : if (tds_thread_create_detached(login_proc, ev) != 0) {
201 0 : pool_free_user(pool, puser);
202 0 : fprintf(stderr, "error creating thread\n");
203 0 : return NULL;
204 : }
205 :
206 : return puser;
207 : }
208 :
209 : /*
210 : * pool_free_user
211 : * close out a disconnected user.
212 : */
213 : void
214 720 : pool_free_user(TDS_POOL *pool, TDS_POOL_USER * puser)
215 : {
216 720 : TDS_POOL_MEMBER *pmbr = puser->assigned_member;
217 720 : if (pmbr) {
218 716 : assert(pmbr->current_user == puser);
219 716 : pool_deassign_member(pool, pmbr);
220 716 : pool_reset_member(pool, pmbr);
221 : }
222 :
223 720 : tds_free_socket(puser->sock.tds);
224 720 : tds_free_login(puser->login);
225 :
226 : /* make sure to decrement the waiters list if he is waiting */
227 720 : if (puser->user_state == TDS_SRV_WAIT)
228 0 : dlist_user_remove(&pool->waiters, puser);
229 : else
230 720 : dlist_user_remove(&pool->users, puser);
231 720 : pool->num_users--;
232 720 : free(puser);
233 720 : }
234 :
235 : /*
236 : * pool_process_users
237 : * check the fd_set for user input, allocate a pool member to it, and forward
238 : * the query to that member.
239 : */
240 : void
241 36421 : pool_process_users(TDS_POOL * pool, struct pollfd *fds, unsigned num_fds)
242 : {
243 : TDS_POOL_USER *puser, *next;
244 : short revents;
245 :
246 148065 : for (next = dlist_user_first(&pool->users); (puser = next) != NULL; ) {
247 :
248 150446 : next = dlist_user_next(&pool->users, puser);
249 :
250 75223 : if (!puser->sock.tds || puser->sock.poll_index >= num_fds)
251 0 : continue; /* dead connection */
252 :
253 75223 : revents = fds[puser->sock.poll_index].revents;
254 :
255 75223 : if (puser->sock.poll_recv && (revents & (POLLIN|POLLHUP)) != 0) {
256 16129 : assert(puser->user_state == TDS_SRV_QUERY);
257 16129 : if (!pool_user_read(pool, puser))
258 716 : continue;
259 : }
260 74507 : if (puser->sock.poll_send && (revents & POLLOUT) != 0) {
261 0 : if (!pool_write_data(&puser->assigned_member->sock, &puser->sock))
262 0 : pool_free_member(pool, puser->assigned_member);
263 : }
264 : } /* for */
265 36421 : }
266 :
267 : /*
268 : * pool_user_login
269 : * Reads clients login packet and forges a login acknowledgement sequence
270 : */
271 : static bool
272 720 : pool_user_login(TDS_POOL * pool, TDS_POOL_USER * puser)
273 : {
274 : TDSSOCKET *tds;
275 : TDSLOGIN *login;
276 :
277 720 : tds = puser->sock.tds;
278 1440 : while (tds->in_len <= tds->in_pos)
279 720 : if (tds_read_packet(tds) < 0)
280 : return false;
281 :
282 720 : tdsdump_log(TDS_DBG_NETWORK, "got packet type %d\n", tds->in_flag);
283 720 : if (tds->in_flag == TDS71_PRELOGIN) {
284 720 : if (!tds->conn->tds_version)
285 720 : tds->conn->tds_version = 0x701;
286 720 : tds->out_flag = TDS_REPLY;
287 : // TODO proper one !!
288 : // TODO detect TDS version here ??
289 720 : tds_put_n(tds, "\x00\x00\x1a\x00\x06" /* version */
290 : "\x01\x00\x20\x00\x01" /* encryption */
291 : "\x02\x00\x21\x00\x01" /* instance ?? */
292 : "\x03\x00\x22\x00\x00" /* process id ?? */
293 : "\x04\x00\x22\x00\x01" /* MARS */
294 : "\xff"
295 : "\x0a\x00\x06\x40\x00\x00"
296 : "\x02"
297 : "\x00"
298 : ""
299 : "\x00", 0x23);
300 720 : tds_flush_packet(tds);
301 :
302 : /* read another packet */
303 720 : tds->in_pos = tds->in_len;
304 1440 : while (tds->in_len <= tds->in_pos)
305 720 : if (tds_read_packet(tds) < 0)
306 : return false;
307 : }
308 :
309 720 : puser->login = login = tds_alloc_login(true);
310 720 : if (!login) {
311 0 : tdsdump_log(TDS_DBG_ERROR, "tds_alloc_login() failed.\n");
312 : return false;
313 : }
314 720 : if (tds->in_flag == TDS_LOGIN) {
315 0 : if (!tds->conn->tds_version)
316 0 : tds->conn->tds_version = 0x500;
317 0 : tds_read_login(tds, login);
318 720 : } else if (tds->in_flag == TDS7_LOGIN) {
319 720 : if (!tds->conn->tds_version)
320 0 : tds->conn->tds_version = 0x700;
321 720 : if (!tds7_read_login(tds, login))
322 : return false;
323 : } else {
324 : return false;
325 : }
326 :
327 : /* check we support version required */
328 : // TODO function to check it
329 720 : if (!IS_TDS71_PLUS(login))
330 : return false;
331 :
332 720 : tds->in_len = tds->in_pos = 0;
333 :
334 720 : dump_login(login);
335 1440 : if (strcmp(tds_dstr_cstr(&login->user_name), pool->user) != 0
336 1440 : || strcmp(tds_dstr_cstr(&login->password), pool->password) != 0)
337 : /* TODO send nack before exiting */
338 : return false;
339 :
340 : return true;
341 : }
342 :
343 : bool
344 718 : pool_user_send_login_ack(TDS_POOL * pool, TDS_POOL_USER * puser)
345 : {
346 : char msg[256];
347 : char block[32];
348 718 : TDSSOCKET *tds = puser->sock.tds, *mtds = puser->assigned_member->sock.tds;
349 718 : TDSLOGIN *login = puser->login;
350 : const char *database;
351 718 : const char *server = mtds->conn->server ? mtds->conn->server : "JDBC";
352 : bool dbname_mismatch, odbc_mismatch;
353 :
354 718 : pool->user_logins++;
355 :
356 : /* copy a bit of information, resize socket with block */
357 718 : tds->conn->tds_version = mtds->conn->tds_version;
358 718 : tds->conn->product_version = mtds->conn->product_version;
359 718 : memcpy(tds->conn->collation, mtds->conn->collation, sizeof(tds->conn->collation));
360 718 : tds->conn->tds71rev1 = mtds->conn->tds71rev1;
361 718 : free(tds->conn->product_name);
362 718 : tds->conn->product_name = strdup(mtds->conn->product_name);
363 718 : tds_realloc_socket(tds, mtds->conn->env.block_size);
364 718 : tds->conn->env.block_size = mtds->conn->env.block_size;
365 718 : tds->conn->client_spid = mtds->conn->spid;
366 :
367 : /* if database is different use USE statement */
368 718 : database = pool->database;
369 1436 : dbname_mismatch = !tds_dstr_isempty(&login->database)
370 974 : && strcasecmp(tds_dstr_cstr(&login->database), database) != 0;
371 718 : odbc_mismatch = (login->option_flag2 & TDS_ODBC_ON) == 0;
372 718 : if (dbname_mismatch || odbc_mismatch) {
373 : char *str;
374 352 : int len = 128 + tds_quote_id(mtds, NULL, tds_dstr_cstr(&login->database),-1);
375 : TDSRET ret;
376 :
377 176 : if ((str = tds_new(char, len)) == NULL)
378 : return false;
379 :
380 176 : str[0] = 0;
381 : /* swicth to dblib options */
382 176 : if (odbc_mismatch)
383 174 : strcat(str, "SET ANSI_DEFAULTS OFF\nSET CONCAT_NULL_YIELDS_NULL OFF\n");
384 176 : if (dbname_mismatch) {
385 2 : strcat(str, "USE ");
386 4 : tds_quote_id(mtds, strchr(str, 0), tds_dstr_cstr(&login->database), -1);
387 : }
388 176 : ret = tds_submit_query(mtds, str);
389 176 : free(str);
390 176 : if (TDS_FAILED(ret) || TDS_FAILED(tds_process_simple_query(mtds)))
391 : return false;
392 176 : if (dbname_mismatch)
393 4 : database = tds_dstr_cstr(&login->database);
394 : else
395 174 : database = mtds->conn->env.database;
396 : }
397 :
398 : // 7.0
399 : // env database
400 : // database change message (with server name correct)
401 : // env language
402 : // language change message
403 : // env 0x3 charset ("iso_1")
404 : // env 0x5 lcid ("1033")
405 : // env 0x6 ("196609" ?? 0x30001)
406 : // loginack
407 : // env 0x4 packet size
408 : // done
409 : //
410 : // 7.1/7.2/7.3
411 : // env database
412 : // database change message (with server name correct)
413 : // env 0x7 collation
414 : // env language
415 : // language change message
416 : // loginack
417 : // env 0x4 packet size
418 : // done
419 718 : tds->out_flag = TDS_REPLY;
420 718 : tds_env_change(tds, TDS_ENV_DATABASE, "master", database);
421 718 : sprintf(msg, "Changed database context to '%s'.", database);
422 718 : tds_send_msg(tds, 5701, 2, 0, msg, server, NULL, 1);
423 718 : if (!login->suppress_language) {
424 718 : tds_env_change(tds, TDS_ENV_LANG, NULL, "us_english");
425 718 : tds_send_msg(tds, 5703, 1, 0, "Changed language setting to 'us_english'.", server, NULL, 1);
426 : }
427 :
428 718 : if (IS_TDS71_PLUS(tds->conn)) {
429 718 : tds_put_byte(tds, TDS_ENVCHANGE_TOKEN);
430 718 : tds_put_smallint(tds, 8);
431 718 : tds_put_byte(tds, TDS_ENV_SQLCOLLATION);
432 718 : tds_put_byte(tds, 5);
433 718 : tds_put_n(tds, tds->conn->collation, 5);
434 718 : tds_put_byte(tds, 0);
435 : }
436 :
437 718 : tds_send_login_ack(tds, mtds->conn->product_name);
438 718 : sprintf(block, "%d", tds->conn->env.block_size);
439 718 : tds_env_change(tds, TDS_ENV_PACKSIZE, block, block);
440 : /* tds_send_capabilities_token(tds); */
441 718 : tds_send_done_token(tds, TDS_DONE_FINAL, 0);
442 :
443 : /* send it! */
444 718 : tds_flush_packet(tds);
445 :
446 718 : tds_free_login(login);
447 718 : puser->login = NULL;
448 718 : return true;
449 : }
450 :
451 : /*
452 : * pool_user_read
453 : * checks the packet type of data coming from the client and allocates a
454 : * pool member if necessary.
455 : */
456 : static bool
457 16129 : pool_user_read(TDS_POOL * pool, TDS_POOL_USER * puser)
458 : {
459 16129 : TDSSOCKET *tds = puser->sock.tds;
460 16129 : TDS_POOL_MEMBER *pmbr = NULL;
461 :
462 : for (;;) {
463 : TDS_UCHAR in_flag;
464 :
465 31972 : if (pool_packet_read(tds))
466 : break;
467 16559 : if (tds->in_len == 0) {
468 716 : tdsdump_log(TDS_DBG_INFO1, "user disconnected\n");
469 716 : pool_free_user(pool, puser);
470 716 : return false;
471 : }
472 :
473 15843 : tdsdump_dump_buf(TDS_DBG_NETWORK, "Got packet from client:", tds->in_buf, tds->in_len);
474 :
475 15843 : in_flag = tds->in_buf[0];
476 : switch (in_flag) {
477 15843 : case TDS_QUERY:
478 : case TDS_NORMAL:
479 : case TDS_RPC:
480 : case TDS_BULK:
481 : case TDS_CANCEL:
482 : case TDS7_TRANS:
483 15843 : if (!pool_write_data(&puser->sock, &puser->assigned_member->sock)) {
484 0 : pool_reset_member(pool, puser->assigned_member);
485 0 : return false;
486 : }
487 15843 : pmbr = puser->assigned_member;
488 : break;
489 :
490 0 : default:
491 0 : tdsdump_log(TDS_DBG_ERROR, "Unrecognized packet type, closing user\n");
492 0 : pool_free_user(pool, puser);
493 0 : return false;
494 : }
495 15843 : if (tds->in_pos < tds->in_len)
496 : /* partial write, schedule a future write */
497 : break;
498 : }
499 15413 : if (pmbr && !pmbr->sock.poll_send)
500 15411 : tds_socket_flush(tds_get_s(pmbr->sock.tds));
501 : return true;
502 : }
503 :
504 : void
505 718 : pool_user_query(TDS_POOL * pool, TDS_POOL_USER * puser)
506 : {
507 : TDS_POOL_MEMBER *pmbr;
508 :
509 718 : tdsdump_log(TDS_DBG_FUNC, "pool_user_query\n");
510 :
511 718 : assert(puser->assigned_member == NULL);
512 718 : assert(puser->login);
513 :
514 718 : puser->user_state = TDS_SRV_QUERY;
515 718 : pmbr = pool_assign_idle_member(pool, puser);
516 718 : if (!pmbr) {
517 : /*
518 : * put into wait state
519 : * check when member is deallocated
520 : */
521 0 : tdsdump_log(TDS_DBG_INFO1, "Not enough free members...placing user in WAIT\n");
522 0 : puser->user_state = TDS_SRV_WAIT;
523 0 : puser->sock.poll_recv = false;
524 0 : puser->sock.poll_send = false;
525 0 : dlist_user_remove(&pool->users, puser);
526 0 : dlist_user_append(&pool->waiters, puser);
527 : }
528 718 : }
529 :
530 : typedef struct {
531 : TDS_POOL_EVENT common;
532 : TDS_POOL *pool;
533 : TDS_POOL_USER *puser;
534 : bool success;
535 : } END_LOGIN_EVENT;
536 :
537 695 : static TDS_THREAD_PROC_DECLARE(end_login_proc, arg)
538 : {
539 695 : END_LOGIN_EVENT *ev = (END_LOGIN_EVENT *) arg;
540 695 : TDS_POOL *pool = ev->pool;
541 :
542 695 : ev->success = pool_user_send_login_ack(pool, ev->puser);
543 :
544 695 : pool_event_add(pool, &ev->common, end_login_execute);
545 695 : return TDS_THREAD_RESULT(0);
546 : }
547 :
548 : static void
549 695 : end_login_execute(TDS_POOL_EVENT *base_event)
550 : {
551 695 : END_LOGIN_EVENT *ev = (END_LOGIN_EVENT *) base_event;
552 695 : TDS_POOL *pool = ev->pool;
553 695 : TDS_POOL_USER *puser = ev->puser;
554 695 : TDS_POOL_MEMBER *pmbr = puser->assigned_member;
555 :
556 695 : if (!ev->success) {
557 0 : pool_free_member(pool, pmbr);
558 0 : return;
559 : }
560 :
561 695 : puser->sock.poll_recv = true;
562 695 : puser->sock.poll_send = false;
563 695 : pmbr->sock.poll_recv = true;
564 695 : pmbr->sock.poll_send = false;
565 : }
566 :
567 : /**
568 : * Handle async login
569 : */
570 : void
571 695 : pool_user_finish_login(TDS_POOL * pool, TDS_POOL_USER * puser)
572 : {
573 695 : END_LOGIN_EVENT *ev = tds_new0(END_LOGIN_EVENT, 1);
574 695 : if (!ev) {
575 0 : pool_free_member(pool, puser->assigned_member);
576 0 : return;
577 : }
578 :
579 695 : ev->pool = pool;
580 695 : ev->puser = puser;
581 :
582 695 : if (tds_thread_create_detached(end_login_proc, ev) != 0) {
583 0 : pool_free_member(pool, puser->assigned_member);
584 0 : free(ev);
585 0 : fprintf(stderr, "error creating thread\n");
586 : }
587 : }
|