Line data Source code
1 : /* TDSPool - Connection pooling for TDS based databases
2 : * Copyright (C) 2001, 2002, 2003, 2004, 2005 Brian Bruns
3 : *
4 : * This program is free software; you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation; either version 2 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program; if not, write to the Free Software
16 : * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 : *
18 : */
19 :
20 : #include <config.h>
21 :
22 : #include <stdarg.h>
23 : #include <stdio.h>
24 : #include <errno.h>
25 : #include <assert.h>
26 :
27 : #if HAVE_STDLIB_H
28 : #include <stdlib.h>
29 : #endif /* HAVE_STDLIB_H */
30 :
31 : #if HAVE_STRING_H
32 : #include <string.h>
33 : #endif /* HAVE_STRING_H */
34 :
35 : #if HAVE_UNISTD_H
36 : #include <unistd.h>
37 : #endif /* HAVE_UNISTD_H */
38 :
39 : #if HAVE_SYS_TYPES_H
40 : #include <sys/types.h>
41 : #endif /* HAVE_SYS_TYPES_H */
42 :
43 : #if HAVE_SYS_SOCKET_H
44 : #include <sys/socket.h>
45 : #endif /* HAVE_SYS_SOCKET_H */
46 :
47 : #include "pool.h"
48 : #include <freetds/server.h>
49 : #include <freetds/utils/string.h>
50 :
51 : static TDS_POOL_USER *pool_user_find_new(TDS_POOL * pool);
52 : static bool pool_user_login(TDS_POOL * pool, TDS_POOL_USER * puser);
53 : static bool pool_user_read(TDS_POOL * pool, TDS_POOL_USER * puser);
54 : static void login_execute(TDS_POOL_EVENT *base_event);
55 : static void end_login_execute(TDS_POOL_EVENT *base_event);
56 :
57 : void
58 2 : pool_user_init(TDS_POOL * pool)
59 : {
60 4 : dlist_user_init(&pool->users);
61 4 : dlist_user_init(&pool->waiters);
62 2 : pool->ctx = tds_alloc_context(NULL);
63 2 : }
64 :
65 : void
66 2 : pool_user_destroy(TDS_POOL * pool)
67 : {
68 4 : while (dlist_user_first(&pool->users))
69 0 : pool_free_user(pool, dlist_user_first(&pool->users));
70 2 : while (dlist_user_first(&pool->waiters))
71 0 : pool_free_user(pool, dlist_user_first(&pool->waiters));
72 :
73 2 : tds_free_context(pool->ctx);
74 2 : pool->ctx = NULL;
75 2 : }
76 :
77 : static TDS_POOL_USER *
78 714 : pool_user_find_new(TDS_POOL * pool)
79 : {
80 : TDS_POOL_USER *puser;
81 :
82 : /* did we exhaust the number of concurrent users? */
83 714 : if (pool->num_users >= MAX_POOL_USERS) {
84 0 : fprintf(stderr, "Max concurrent users exceeded, increase in pool.h\n");
85 0 : return NULL;
86 : }
87 :
88 714 : puser = tds_new0(TDS_POOL_USER, 1);
89 714 : if (!puser) {
90 0 : fprintf(stderr, "Out of memory\n");
91 0 : return NULL;
92 : }
93 :
94 714 : dlist_user_append(&pool->users, puser);
95 714 : pool->num_users++;
96 :
97 714 : return puser;
98 : }
99 :
100 : typedef struct {
101 : TDS_POOL_EVENT common;
102 : TDS_POOL *pool;
103 : TDS_POOL_USER *puser;
104 : bool success;
105 : } LOGIN_EVENT;
106 :
107 714 : static TDS_THREAD_PROC_DECLARE(login_proc, arg)
108 : {
109 714 : LOGIN_EVENT *ev = (LOGIN_EVENT *) arg;
110 :
111 714 : ev->success = pool_user_login(ev->pool, ev->puser);
112 :
113 714 : pool_event_add(ev->pool, &ev->common, login_execute);
114 714 : return TDS_THREAD_RESULT(0);
115 : }
116 :
117 : static void
118 714 : login_execute(TDS_POOL_EVENT *base_event)
119 : {
120 714 : LOGIN_EVENT *ev = (LOGIN_EVENT *) base_event;
121 714 : TDS_POOL_USER *puser = ev->puser;
122 714 : TDS_POOL *pool = ev->pool;
123 :
124 714 : if (!ev->success) {
125 : /* login failed...free socket */
126 2 : pool_free_user(pool, puser);
127 2 : return;
128 : }
129 :
130 712 : puser->sock.poll_recv = true;
131 :
132 : /* try to assign a member, connection can have transactions
133 : * and so on so deassign only when disconnected */
134 712 : pool_user_query(pool, puser);
135 :
136 712 : tdsdump_log(TDS_DBG_INFO1, "user state %d\n", puser->user_state);
137 :
138 712 : assert(puser->login || puser->user_state == TDS_SRV_QUERY);
139 : }
140 :
141 :
142 : /*
143 : * pool_user_create
144 : * accepts a client connection and adds it to the users list and returns it
145 : */
146 : TDS_POOL_USER *
147 714 : pool_user_create(TDS_POOL * pool, TDS_SYS_SOCKET s)
148 : {
149 : TDS_POOL_USER *puser;
150 : TDS_SYS_SOCKET fd;
151 : TDSSOCKET *tds;
152 : LOGIN_EVENT *ev;
153 :
154 714 : tdsdump_log(TDS_DBG_NETWORK, "accepting connection\n");
155 714 : if (TDS_IS_SOCKET_INVALID(fd = tds_accept(s, NULL, NULL))) {
156 0 : char *errstr = sock_strerror(errno);
157 0 : tdsdump_log(TDS_DBG_ERROR, "error calling assert :%s\n", errstr);
158 : sock_strerror_free(errstr);
159 : return NULL;
160 : }
161 :
162 714 : if (tds_socket_set_nonblocking(fd) != 0) {
163 0 : CLOSESOCKET(fd);
164 0 : return NULL;
165 : }
166 :
167 714 : puser = pool_user_find_new(pool);
168 714 : if (!puser) {
169 0 : CLOSESOCKET(fd);
170 0 : return NULL;
171 : }
172 :
173 714 : tds = tds_alloc_socket(pool->ctx, BLOCKSIZ);
174 714 : if (!tds) {
175 0 : CLOSESOCKET(fd);
176 0 : return NULL;
177 : }
178 714 : ev = tds_new0(LOGIN_EVENT, 1);
179 714 : if (!ev || TDS_FAILED(tds_iconv_open(tds->conn, "UTF-8", 0))) {
180 0 : free(ev);
181 0 : tds_free_socket(tds);
182 0 : CLOSESOCKET(fd);
183 0 : return NULL;
184 : }
185 714 : tds_set_s(tds, fd);
186 714 : tds->state = TDS_IDLE;
187 714 : tds->out_flag = TDS_LOGIN;
188 :
189 714 : puser->sock.tds = tds;
190 714 : puser->user_state = TDS_SRV_QUERY;
191 714 : puser->sock.poll_recv = false;
192 714 : puser->sock.poll_send = false;
193 :
194 : /* launch login asyncronously */
195 714 : ev->puser = puser;
196 714 : ev->pool = pool;
197 :
198 714 : if (tds_thread_create_detached(login_proc, ev) != 0) {
199 0 : pool_free_user(pool, puser);
200 0 : fprintf(stderr, "error creating thread\n");
201 0 : return NULL;
202 : }
203 :
204 : return puser;
205 : }
206 :
207 : /*
208 : * pool_free_user
209 : * close out a disconnected user.
210 : */
211 : void
212 714 : pool_free_user(TDS_POOL *pool, TDS_POOL_USER * puser)
213 : {
214 714 : TDS_POOL_MEMBER *pmbr = puser->assigned_member;
215 714 : if (pmbr) {
216 710 : assert(pmbr->current_user == puser);
217 710 : pool_deassign_member(pool, pmbr);
218 710 : pool_reset_member(pool, pmbr);
219 : }
220 :
221 714 : tds_free_socket(puser->sock.tds);
222 714 : tds_free_login(puser->login);
223 :
224 : /* make sure to decrement the waiters list if he is waiting */
225 714 : if (puser->user_state == TDS_SRV_WAIT)
226 0 : dlist_user_remove(&pool->waiters, puser);
227 : else
228 714 : dlist_user_remove(&pool->users, puser);
229 714 : pool->num_users--;
230 714 : free(puser);
231 714 : }
232 :
233 : /*
234 : * pool_process_users
235 : * check the fd_set for user input, allocate a pool member to it, and forward
236 : * the query to that member.
237 : */
238 : void
239 35988 : pool_process_users(TDS_POOL * pool, struct pollfd *fds, unsigned num_fds)
240 : {
241 : TDS_POOL_USER *puser, *next;
242 : short revents;
243 :
244 147073 : for (next = dlist_user_first(&pool->users); (puser = next) != NULL; ) {
245 :
246 150194 : next = dlist_user_next(&pool->users, puser);
247 :
248 75097 : if (!puser->sock.tds || puser->sock.poll_index >= num_fds)
249 0 : continue; /* dead connection */
250 :
251 75097 : revents = fds[puser->sock.poll_index].revents;
252 :
253 75097 : if (puser->sock.poll_recv && (revents & POLLIN) != 0) {
254 16067 : assert(puser->user_state == TDS_SRV_QUERY);
255 16067 : if (!pool_user_read(pool, puser))
256 710 : continue;
257 : }
258 74387 : if (puser->sock.poll_send && (revents & POLLOUT) != 0) {
259 0 : if (!pool_write_data(&puser->assigned_member->sock, &puser->sock))
260 0 : pool_free_member(pool, puser->assigned_member);
261 : }
262 : } /* for */
263 35988 : }
264 :
265 : /*
266 : * pool_user_login
267 : * Reads clients login packet and forges a login acknowledgement sequence
268 : */
269 : static bool
270 714 : pool_user_login(TDS_POOL * pool, TDS_POOL_USER * puser)
271 : {
272 : TDSSOCKET *tds;
273 : TDSLOGIN *login;
274 :
275 714 : tds = puser->sock.tds;
276 1428 : while (tds->in_len <= tds->in_pos)
277 714 : if (tds_read_packet(tds) < 0)
278 : return false;
279 :
280 714 : tdsdump_log(TDS_DBG_NETWORK, "got packet type %d\n", tds->in_flag);
281 714 : if (tds->in_flag == TDS71_PRELOGIN) {
282 714 : if (!tds->conn->tds_version)
283 714 : tds->conn->tds_version = 0x701;
284 714 : tds->out_flag = TDS_REPLY;
285 : // TODO proper one !!
286 : // TODO detect TDS version here ??
287 714 : tds_put_n(tds, "\x00\x00\x1a\x00\x06" /* version */
288 : "\x01\x00\x20\x00\x01" /* encryption */
289 : "\x02\x00\x21\x00\x01" /* instance ?? */
290 : "\x03\x00\x22\x00\x00" /* process id ?? */
291 : "\x04\x00\x22\x00\x01" /* MARS */
292 : "\xff"
293 : "\x0a\x00\x06\x40\x00\x00"
294 : "\x02"
295 : "\x00"
296 : ""
297 : "\x00", 0x23);
298 714 : tds_flush_packet(tds);
299 :
300 : /* read another packet */
301 714 : tds->in_pos = tds->in_len;
302 1428 : while (tds->in_len <= tds->in_pos)
303 714 : if (tds_read_packet(tds) < 0)
304 : return false;
305 : }
306 :
307 714 : puser->login = login = tds_alloc_login(1);
308 714 : if (tds->in_flag == TDS_LOGIN) {
309 0 : if (!tds->conn->tds_version)
310 0 : tds->conn->tds_version = 0x500;
311 0 : tds_read_login(tds, login);
312 714 : } else if (tds->in_flag == TDS7_LOGIN) {
313 714 : if (!tds->conn->tds_version)
314 0 : tds->conn->tds_version = 0x700;
315 714 : if (!tds7_read_login(tds, login))
316 : return false;
317 : } else {
318 : return false;
319 : }
320 :
321 : /* check we support version required */
322 : // TODO function to check it
323 714 : if (!IS_TDS71_PLUS(login))
324 : return false;
325 :
326 714 : tds->in_len = tds->in_pos = 0;
327 :
328 714 : dump_login(login);
329 1428 : if (strcmp(tds_dstr_cstr(&login->user_name), pool->user) != 0
330 1428 : || strcmp(tds_dstr_cstr(&login->password), pool->password) != 0)
331 : /* TODO send nack before exiting */
332 : return false;
333 :
334 : return true;
335 : }
336 :
337 : bool
338 712 : pool_user_send_login_ack(TDS_POOL * pool, TDS_POOL_USER * puser)
339 : {
340 : char msg[256];
341 : char block[32];
342 712 : TDSSOCKET *tds = puser->sock.tds, *mtds = puser->assigned_member->sock.tds;
343 712 : TDSLOGIN *login = puser->login;
344 : const char *database;
345 712 : const char *server = mtds->conn->server ? mtds->conn->server : "JDBC";
346 : bool dbname_mismatch, odbc_mismatch;
347 :
348 712 : pool->user_logins++;
349 :
350 : /* copy a bit of information, resize socket with block */
351 712 : tds->conn->tds_version = mtds->conn->tds_version;
352 712 : tds->conn->product_version = mtds->conn->product_version;
353 712 : memcpy(tds->conn->collation, mtds->conn->collation, sizeof(tds->conn->collation));
354 712 : tds->conn->tds71rev1 = mtds->conn->tds71rev1;
355 712 : free(tds->conn->product_name);
356 712 : tds->conn->product_name = strdup(mtds->conn->product_name);
357 712 : tds_realloc_socket(tds, mtds->conn->env.block_size);
358 712 : tds->conn->env.block_size = mtds->conn->env.block_size;
359 712 : tds->conn->client_spid = mtds->conn->spid;
360 :
361 : /* if database is different use USE statement */
362 712 : database = pool->database;
363 1424 : dbname_mismatch = !tds_dstr_isempty(&login->database)
364 968 : && strcasecmp(tds_dstr_cstr(&login->database), database) != 0;
365 712 : odbc_mismatch = (login->option_flag2 & TDS_ODBC_ON) == 0;
366 712 : if (dbname_mismatch || odbc_mismatch) {
367 : char *str;
368 348 : int len = 128 + tds_quote_id(mtds, NULL, tds_dstr_cstr(&login->database),-1);
369 : TDSRET ret;
370 :
371 174 : if ((str = tds_new(char, len)) == NULL)
372 : return false;
373 :
374 174 : str[0] = 0;
375 : /* swicth to dblib options */
376 174 : if (odbc_mismatch)
377 172 : strcat(str, "SET ANSI_DEFAULTS OFF\nSET CONCAT_NULL_YIELDS_NULL OFF\n");
378 174 : if (dbname_mismatch) {
379 2 : strcat(str, "USE ");
380 4 : tds_quote_id(mtds, strchr(str, 0), tds_dstr_cstr(&login->database), -1);
381 : }
382 174 : ret = tds_submit_query(mtds, str);
383 174 : free(str);
384 174 : if (TDS_FAILED(ret) || TDS_FAILED(tds_process_simple_query(mtds)))
385 : return false;
386 174 : if (dbname_mismatch)
387 4 : database = tds_dstr_cstr(&login->database);
388 : else
389 172 : database = mtds->conn->env.database;
390 : }
391 :
392 : // 7.0
393 : // env database
394 : // database change message (with server name correct)
395 : // env language
396 : // language change message
397 : // env 0x3 charset ("iso_1")
398 : // env 0x5 lcid ("1033")
399 : // env 0x6 ("196609" ?? 0x30001)
400 : // loginack
401 : // env 0x4 packet size
402 : // done
403 : //
404 : // 7.1/7.2/7.3
405 : // env database
406 : // database change message (with server name correct)
407 : // env 0x7 collation
408 : // env language
409 : // language change message
410 : // loginack
411 : // env 0x4 packet size
412 : // done
413 712 : tds->out_flag = TDS_REPLY;
414 712 : tds_env_change(tds, TDS_ENV_DATABASE, "master", database);
415 712 : sprintf(msg, "Changed database context to '%s'.", database);
416 712 : tds_send_msg(tds, 5701, 2, 0, msg, server, NULL, 1);
417 712 : if (!login->suppress_language) {
418 712 : tds_env_change(tds, TDS_ENV_LANG, NULL, "us_english");
419 712 : tds_send_msg(tds, 5703, 1, 0, "Changed language setting to 'us_english'.", server, NULL, 1);
420 : }
421 :
422 712 : if (IS_TDS71_PLUS(tds->conn)) {
423 712 : tds_put_byte(tds, TDS_ENVCHANGE_TOKEN);
424 712 : tds_put_smallint(tds, 8);
425 712 : tds_put_byte(tds, TDS_ENV_SQLCOLLATION);
426 712 : tds_put_byte(tds, 5);
427 712 : tds_put_n(tds, tds->conn->collation, 5);
428 712 : tds_put_byte(tds, 0);
429 : }
430 :
431 712 : tds_send_login_ack(tds, mtds->conn->product_name);
432 712 : sprintf(block, "%d", tds->conn->env.block_size);
433 712 : tds_env_change(tds, TDS_ENV_PACKSIZE, block, block);
434 : /* tds_send_capabilities_token(tds); */
435 712 : tds_send_done_token(tds, 0, 0);
436 :
437 : /* send it! */
438 712 : tds_flush_packet(tds);
439 :
440 712 : tds_free_login(login);
441 712 : puser->login = NULL;
442 712 : return true;
443 : }
444 :
445 : /*
446 : * pool_user_read
447 : * checks the packet type of data coming from the client and allocates a
448 : * pool member if necessary.
449 : */
450 : static bool
451 16067 : pool_user_read(TDS_POOL * pool, TDS_POOL_USER * puser)
452 : {
453 16067 : TDSSOCKET *tds = puser->sock.tds;
454 16067 : TDS_POOL_MEMBER *pmbr = NULL;
455 :
456 : for (;;) {
457 : TDS_UCHAR in_flag;
458 :
459 31853 : if (pool_packet_read(tds))
460 : break;
461 16497 : if (tds->in_len == 0) {
462 710 : tdsdump_log(TDS_DBG_INFO1, "user disconnected\n");
463 710 : pool_free_user(pool, puser);
464 710 : return false;
465 : }
466 :
467 15787 : tdsdump_dump_buf(TDS_DBG_NETWORK, "Got packet from client:", tds->in_buf, tds->in_len);
468 :
469 15787 : in_flag = tds->in_buf[0];
470 : switch (in_flag) {
471 15787 : case TDS_QUERY:
472 : case TDS_NORMAL:
473 : case TDS_RPC:
474 : case TDS_BULK:
475 : case TDS_CANCEL:
476 : case TDS7_TRANS:
477 15787 : if (!pool_write_data(&puser->sock, &puser->assigned_member->sock)) {
478 0 : pool_reset_member(pool, puser->assigned_member);
479 0 : return false;
480 : }
481 15787 : pmbr = puser->assigned_member;
482 : break;
483 :
484 0 : default:
485 0 : tdsdump_log(TDS_DBG_ERROR, "Unrecognized packet type, closing user\n");
486 0 : pool_free_user(pool, puser);
487 0 : return false;
488 : }
489 15787 : if (tds->in_pos < tds->in_len)
490 : /* partial write, schedule a future write */
491 : break;
492 : }
493 15357 : if (pmbr && !pmbr->sock.poll_send)
494 15354 : tds_socket_flush(tds_get_s(pmbr->sock.tds));
495 : return true;
496 : }
497 :
498 : void
499 712 : pool_user_query(TDS_POOL * pool, TDS_POOL_USER * puser)
500 : {
501 : TDS_POOL_MEMBER *pmbr;
502 :
503 712 : tdsdump_log(TDS_DBG_FUNC, "pool_user_query\n");
504 :
505 712 : assert(puser->assigned_member == NULL);
506 712 : assert(puser->login);
507 :
508 712 : puser->user_state = TDS_SRV_QUERY;
509 712 : pmbr = pool_assign_idle_member(pool, puser);
510 712 : if (!pmbr) {
511 : /*
512 : * put into wait state
513 : * check when member is deallocated
514 : */
515 0 : tdsdump_log(TDS_DBG_INFO1, "Not enough free members...placing user in WAIT\n");
516 0 : puser->user_state = TDS_SRV_WAIT;
517 0 : puser->sock.poll_recv = false;
518 0 : puser->sock.poll_send = false;
519 0 : dlist_user_remove(&pool->users, puser);
520 0 : dlist_user_append(&pool->waiters, puser);
521 : }
522 712 : }
523 :
524 : typedef struct {
525 : TDS_POOL_EVENT common;
526 : TDS_POOL *pool;
527 : TDS_POOL_USER *puser;
528 : bool success;
529 : } END_LOGIN_EVENT;
530 :
531 690 : static TDS_THREAD_PROC_DECLARE(end_login_proc, arg)
532 : {
533 690 : END_LOGIN_EVENT *ev = (END_LOGIN_EVENT *) arg;
534 690 : TDS_POOL *pool = ev->pool;
535 :
536 690 : ev->success = pool_user_send_login_ack(pool, ev->puser);
537 :
538 690 : pool_event_add(pool, &ev->common, end_login_execute);
539 690 : return TDS_THREAD_RESULT(0);
540 : }
541 :
542 : static void
543 690 : end_login_execute(TDS_POOL_EVENT *base_event)
544 : {
545 690 : END_LOGIN_EVENT *ev = (END_LOGIN_EVENT *) base_event;
546 690 : TDS_POOL *pool = ev->pool;
547 690 : TDS_POOL_USER *puser = ev->puser;
548 690 : TDS_POOL_MEMBER *pmbr = puser->assigned_member;
549 :
550 690 : if (!ev->success) {
551 0 : pool_free_member(pool, pmbr);
552 0 : return;
553 : }
554 :
555 690 : puser->sock.poll_recv = true;
556 690 : puser->sock.poll_send = false;
557 690 : pmbr->sock.poll_recv = true;
558 690 : pmbr->sock.poll_send = false;
559 : }
560 :
561 : /**
562 : * Handle async login
563 : */
564 : void
565 690 : pool_user_finish_login(TDS_POOL * pool, TDS_POOL_USER * puser)
566 : {
567 690 : END_LOGIN_EVENT *ev = tds_new0(END_LOGIN_EVENT, 1);
568 690 : if (!ev) {
569 0 : pool_free_member(pool, puser->assigned_member);
570 0 : return;
571 : }
572 :
573 690 : ev->pool = pool;
574 690 : ev->puser = puser;
575 :
576 690 : if (tds_thread_create_detached(end_login_proc, ev) != 0) {
577 0 : pool_free_member(pool, puser->assigned_member);
578 0 : free(ev);
579 0 : fprintf(stderr, "error creating thread\n");
580 : }
581 : }
|