Line data Source code
1 : typedef struct dblib_buffer_row {
2 : /** pointer to result information */
3 : TDSRESULTINFO *resinfo;
4 : /** row data, NULL for resinfo->current_row */
5 : unsigned char *row_data;
6 : /** row number */
7 : DBINT row;
8 : /** save old sizes */
9 : TDS_INT *sizes;
10 : } DBLIB_BUFFER_ROW;
11 :
12 : static void buffer_struct_print(const DBPROC_ROWBUF *buf);
13 : static RETCODE buffer_save_row(DBPROCESS *dbproc);
14 : static DBLIB_BUFFER_ROW* buffer_row_address(const DBPROC_ROWBUF * buf, int idx);
15 :
16 : #if ENABLE_EXTRA_CHECKS
17 24254704 : static void buffer_check_row_empty(DBLIB_BUFFER_ROW *row)
18 : {
19 24254704 : assert(row->resinfo == NULL);
20 24254704 : assert(row->row_data == NULL);
21 24254704 : assert(row->sizes == NULL);
22 24254704 : assert(row->row == 0);
23 24254704 : }
24 :
25 1382088 : static void buffer_check(const DBPROC_ROWBUF *buf)
26 : {
27 : int i;
28 :
29 : /* no buffering */
30 1382088 : if (buf->capacity == 0 || buf->capacity == 1) {
31 1362656 : assert(buf->head == 0);
32 1362656 : assert(buf->tail == 0 || buf->tail == 1);
33 1362656 : assert(buf->capacity == 1 || buf->rows == NULL);
34 : return;
35 : }
36 :
37 19432 : assert(buf->capacity > 0);
38 19432 : assert(buf->head >= 0);
39 19432 : assert(buf->tail >= 0);
40 19432 : assert(buf->head < buf->capacity);
41 19432 : assert(buf->tail <= buf->capacity);
42 :
43 : /* check empty */
44 19432 : if (buf->tail == buf->capacity) {
45 448 : assert(buf->head == 0);
46 497200 : for (i = 0; buf->rows && i < buf->capacity; ++i)
47 497200 : buffer_check_row_empty(&buf->rows[i]);
48 : return;
49 : }
50 :
51 18984 : if (buf->rows == NULL)
52 : return;
53 :
54 : /* check filled part */
55 : i = buf->tail;
56 : do {
57 260576 : assert(i >= 0 && i < buf->capacity);
58 260576 : assert(buf->rows[i].resinfo != NULL);
59 260576 : assert(buf->rows[i].row > 0);
60 260576 : assert(buf->rows[i].row <= buf->received);
61 260576 : ++i;
62 260576 : if (i == buf->capacity)
63 6872 : i = 0;
64 260576 : } while (i != buf->head);
65 :
66 : /* check empty part */
67 18776 : if (buf->head != buf->tail) {
68 : i = buf->head;
69 : do {
70 23757504 : assert(i >= 0 && i < buf->capacity);
71 23757504 : buffer_check_row_empty(&buf->rows[i]);
72 23757504 : ++i;
73 23757504 : if (i == buf->capacity)
74 11904 : i = 0;
75 23757504 : } while (i != buf->tail);
76 : }
77 : }
78 : #define BUFFER_CHECK(buf) buffer_check(buf)
79 : #else
80 : #define BUFFER_CHECK(buf) do {} while(0)
81 : #endif
82 : /**
83 : * A few words on the buffering design.
84 : *
85 : * DBPROC_ROWBUF::buf is a block of row buffers,
86 : * managed as a ring, indexed by head, tail, and current:
87 : *
88 : * head -- where new elements are inserted.
89 : * tail -- oldest element.
90 : * current -- active row (read by dbgetrow/dbnextrow)
91 : *
92 : * capacity is the number of rows that buf can hold.
93 : *
94 : * Each element in buf is preceded by its row_number:
95 : * the result_set row number, determined by counting the rows
96 : * as they're received from the server. Applications communicate
97 : * to db-lib in row numbers, not buffer indices.
98 : *
99 : * Semantics:
100 : * head == 0 && tail == capacity is the initial condition.
101 : * head == tail means the buffer is full, except when capacity is 1.
102 : * head < tail means the buffer has wrapped around.
103 : *
104 : * Whether or not buffering is active is governed by
105 : * dbproc->dbopts[DBBUFFER].optactive.
106 : */
107 :
108 : /**
109 : * number of rows in the buffer
110 : */
111 : static int
112 : buffer_count(const DBPROC_ROWBUF *buf)
113 : {
114 341936 : BUFFER_CHECK(buf);
115 341936 : return (buf->head > buf->tail) ?
116 680904 : buf->head - buf->tail : /* |...TddddH....| */
117 338968 : buf->capacity - (buf->tail - buf->head); /* |ddddH....Tddd| */
118 : }
119 :
120 : /**
121 : * Can the buffer be written to?
122 : */
123 : static int
124 341680 : buffer_is_full(const DBPROC_ROWBUF *buf)
125 : {
126 341680 : BUFFER_CHECK(buf);
127 683360 : return buf->capacity == buffer_count(buf) && buf->capacity > 1;
128 : }
129 :
130 : #ifndef NDEBUG
131 : static int
132 166073 : buffer_index_valid(const DBPROC_ROWBUF *buf, int idx)
133 : {
134 166073 : BUFFER_CHECK(buf);
135 166073 : if (buf->tail <= buf->head)
136 165393 : if (buf->head <= idx && idx <= buf->tail)
137 : return 1;
138 :
139 2232 : if (0 <= idx && idx <= buf->head)
140 : return 1;
141 :
142 96 : if (buf->tail <= idx && idx < buf->capacity)
143 : return 1;
144 : #if 0
145 : tdsdump_log(TDS_DBG_INFO1, "buffer_index_valid: idx = %d\n", idx);
146 : buffer_struct_print(buf);
147 : #endif
148 0 : return 0;
149 : }
150 : #endif
151 :
152 : static void
153 93572 : buffer_free_row(DBLIB_BUFFER_ROW *row)
154 : {
155 93572 : if (row->sizes)
156 11724 : TDS_ZERO_FREE(row->sizes);
157 93572 : if (row->row_data) {
158 2176 : tds_free_row(row->resinfo, row->row_data);
159 2176 : row->row_data = NULL;
160 : }
161 93572 : tds_free_results(row->resinfo);
162 93572 : row->resinfo = NULL;
163 93572 : row->row = 0;
164 93572 : }
165 :
166 : /*
167 : * Buffer is freed at slightly odd points, whenever
168 : * capacity changes:
169 : *
170 : * 1. When setting capacity, to release prior buffer.
171 : * 2. By dbresults. When called the second time, it has to
172 : * release prior storage because the new resultset will have
173 : * a different width.
174 : * 3. By dbclose(), else open/close/open would leak.
175 : */
176 : static void
177 11348 : buffer_free(DBPROC_ROWBUF *buf)
178 : {
179 11348 : BUFFER_CHECK(buf);
180 11348 : if (buf->rows != NULL) {
181 : int i;
182 92612 : for (i = 0; i < buf->capacity; ++i)
183 92612 : buffer_free_row(&buf->rows[i]);
184 9916 : TDS_ZERO_FREE(buf->rows);
185 : }
186 11348 : BUFFER_CHECK(buf);
187 11348 : }
188 :
189 : /*
190 : * When no rows are currently buffered (and the buffer is allocated)
191 : * set the indices to their initial positions.
192 : */
193 : static void
194 : buffer_reset(DBPROC_ROWBUF *buf)
195 : {
196 9940 : buf->head = 0;
197 9940 : buf->current = buf->tail = buf->capacity;
198 9940 : BUFFER_CHECK(buf);
199 : }
200 :
201 : static int
202 : buffer_idx_increment(const DBPROC_ROWBUF *buf, int idx)
203 : {
204 333306 : if (++idx >= buf->capacity) {
205 327930 : idx = 0;
206 : }
207 : return idx;
208 : }
209 :
210 : /**
211 : * Given an index, return the row storage, including
212 : * the DBINT row number prefix.
213 : */
214 : static DBLIB_BUFFER_ROW*
215 332378 : buffer_row_address(const DBPROC_ROWBUF * buf, int idx)
216 : {
217 332378 : BUFFER_CHECK(buf);
218 332378 : if (idx < 0 || idx >= buf->capacity) {
219 0 : tdsdump_log(TDS_DBG_WARN, "idx is %d:\n", idx);
220 0 : buffer_struct_print(buf);
221 0 : return NULL;
222 : }
223 :
224 332378 : return &(buf->rows[idx]);
225 : }
226 :
227 : /**
228 : * Convert an index to a row number.
229 : */
230 : static DBINT
231 : buffer_idx2row(const DBPROC_ROWBUF *buf, int idx)
232 : {
233 272 : BUFFER_CHECK(buf);
234 272 : return buffer_row_address(buf, idx)->row;
235 : }
236 :
237 : /**
238 : * Convert a row number to an index.
239 : */
240 : static int
241 48 : buffer_row2idx(const DBPROC_ROWBUF *buf, int row_number)
242 : {
243 48 : int i = buf->tail;
244 : #ifndef NDEBUG
245 48 : int ii = 0;
246 : #endif
247 :
248 48 : BUFFER_CHECK(buf);
249 48 : if (i == buf->capacity) {
250 0 : assert (buf->head == 0);
251 : return -1; /* no rows buffered */
252 : }
253 :
254 : /*
255 : * March through the buffers from tail to head, stop if we find our row.
256 : * A full queue is indicated by tail == head (which means we can't write).
257 : */
258 : do {
259 272 : if (buffer_idx2row(buf, i) == row_number)
260 : return i;
261 :
262 240 : assert(ii++ < buf->capacity); /* prevent infinite loop */
263 :
264 480 : i = buffer_idx_increment(buf, i);
265 240 : } while (i != buf->head);
266 :
267 : return -1;
268 : }
269 :
270 : /**
271 : * Deleting a row from the buffer doesn't affect memory allocation.
272 : * It just makes the space available for a different row.
273 : */
274 : static void
275 128 : buffer_delete_rows(DBPROC_ROWBUF * buf, int count)
276 : {
277 : int i;
278 :
279 128 : BUFFER_CHECK(buf);
280 240 : if (count < 0 || count > buffer_count(buf)) {
281 : count = buffer_count(buf);
282 : }
283 :
284 1088 : for (i=0; i < count; i++) {
285 960 : if (buf->tail < buf->capacity)
286 960 : buffer_free_row(&buf->rows[buf->tail]);
287 1920 : buf->tail = buffer_idx_increment(buf, buf->tail);
288 : /*
289 : * If deleting rows from the buffer catches the tail to the head,
290 : * return to the initial position. Otherwise, it will look full.
291 : */
292 960 : if (buf->tail == buf->head) {
293 : buffer_reset(buf);
294 : break;
295 : }
296 : }
297 : #if 0
298 : buffer_struct_print(buf);
299 : #endif
300 128 : BUFFER_CHECK(buf);
301 128 : }
302 :
303 : /**
304 : * Transfer data from buffer/tds back to client
305 : */
306 : static void
307 166073 : buffer_transfer_bound_data(DBPROC_ROWBUF *buf, TDS_INT res_type, TDS_INT compute_id, DBPROCESS * dbproc, int idx)
308 : {
309 : int i;
310 : BYTE *src;
311 : const DBLIB_BUFFER_ROW *row;
312 :
313 166073 : tdsdump_log(TDS_DBG_FUNC, "buffer_transfer_bound_data(%p %d %d %p %d)\n", buf, res_type, compute_id, dbproc, idx);
314 166073 : BUFFER_CHECK(buf);
315 166073 : assert(buffer_index_valid(buf, idx));
316 :
317 166073 : row = buffer_row_address(buf, idx);
318 166073 : assert(row->resinfo);
319 :
320 331913 : for (i = 0; i < row->resinfo->num_cols; i++) {
321 : TDS_SERVER_TYPE srctype;
322 : DBINT srclen;
323 331913 : TDSCOLUMN *curcol = row->resinfo->columns[i];
324 :
325 331913 : if (row->sizes)
326 331913 : curcol->column_cur_size = row->sizes[i];
327 :
328 331913 : srclen = curcol->column_cur_size;
329 :
330 331913 : if (curcol->column_nullbind) {
331 184 : if (srclen < 0) {
332 46 : *(DBINT *)(curcol->column_nullbind) = -1;
333 : } else {
334 138 : *(DBINT *)(curcol->column_nullbind) = 0;
335 : }
336 : }
337 331913 : if (!curcol->column_varaddr)
338 3025 : continue;
339 :
340 328888 : if (srclen <= 0) {
341 276 : if (srclen == 0 || !curcol->column_nullbind)
342 230 : dbgetnull(dbproc, curcol->column_bindtype, curcol->column_bindlen,
343 : (BYTE *) curcol->column_varaddr);
344 276 : continue;
345 : }
346 :
347 328612 : srctype = tds_get_conversion_type(curcol->column_type, curcol->column_size);
348 :
349 328612 : if (row->row_data)
350 48 : src = &row->row_data[curcol->column_data - row->resinfo->current_row];
351 : else
352 328564 : src = curcol->column_data;
353 328612 : if (is_blob_col(curcol))
354 124 : src = (BYTE *) ((TDSBLOB *) src)->textvalue;
355 :
356 657224 : copy_data_to_host_var(dbproc, srctype, src, srclen,
357 328612 : (BYTE *) curcol->column_varaddr, curcol->column_bindlen,
358 328612 : curcol->column_bindtype, (DBINT*) curcol->column_nullbind);
359 : }
360 :
361 : /*
362 : * This function always bumps current. Usually, it's called
363 : * by dbnextrow(), so bumping current is a pretty obvious choice.
364 : * It can also be called by dbgetrow(), but that function also
365 : * causes the bump. If you call dbgetrow() for row N, a subsequent
366 : * call to dbnextrow() yields N+1.
367 : */
368 332146 : buf->current = buffer_idx_increment(buf, buf->current);
369 :
370 166073 : } /* end buffer_transfer_bound_data() */
371 :
372 : static void
373 0 : buffer_struct_print(const DBPROC_ROWBUF *buf)
374 : {
375 0 : assert(buf);
376 :
377 0 : tdsdump_log(TDS_DBG_INFO1, "%d rows in buffer\n", buffer_count(buf));
378 :
379 0 : tdsdump_log(TDS_DBG_INFO1, "head = %d\n", buf->head);
380 0 : tdsdump_log(TDS_DBG_INFO1, "tail = %d\n", buf->tail);
381 0 : tdsdump_log(TDS_DBG_INFO1, "current = %d\n", buf->current);
382 0 : tdsdump_log(TDS_DBG_INFO1, "capacity = %d\n", buf->capacity);
383 0 : tdsdump_log(TDS_DBG_INFO1, "head row number = %d\n", buf->received);
384 0 : }
385 :
386 : /* * * Functions called only by public db-lib API take DBPROCESS* * */
387 :
388 : /**
389 : * Return the current row buffer index.
390 : * We strive to validate it first. It must be:
391 : * between zero and capacity (obviously), and
392 : * between the head and the tail, logically.
393 : *
394 : * If the head has wrapped the tail, it shouldn't be in no man's land.
395 : * IOW, if capacity is 9, head is 3 and tail is 7, good rows are 7-8 and 0-2.
396 : * (Row 3 is about-to-be-inserted, and 4-6 are not in use.) Here's a diagram:
397 : * d d d ! ! ! ! d d
398 : * 0 1 2 3 4 5 6 7 8
399 : * ^ ^
400 : * Head Tail
401 : *
402 : * The special case is capacity == 1, meaning there's no buffering, and head == tail === 0.
403 : */
404 : static int
405 175655 : buffer_current_index(const DBPROCESS *dbproc)
406 : {
407 175655 : const DBPROC_ROWBUF *buf = &dbproc->row_buf;
408 : #if 0
409 : buffer_struct_print(buf);
410 : #endif
411 175655 : if (buf->capacity <= 1) /* no buffering */
412 : return -1;
413 2344 : if (buf->current == buf->head || buf->current == buf->capacity)
414 : return -1;
415 :
416 8 : assert(buf->current >= 0);
417 8 : assert(buf->current < buf->capacity);
418 :
419 8 : if( buf->tail < buf->head) {
420 0 : assert(buf->tail < buf->current);
421 0 : assert(buf->current < buf->head);
422 : } else {
423 8 : if (buf->current > buf->head)
424 8 : assert(buf->current > buf->tail);
425 : }
426 : return buf->current;
427 : }
428 :
429 : /*
430 : * Normally called by dbsetopt() to prepare for buffering
431 : * Called with nrows == 0 by dbopen to safely set buf->rows to NULL.
432 : */
433 : static void
434 736 : buffer_set_capacity(DBPROCESS *dbproc, int nrows)
435 : {
436 736 : DBPROC_ROWBUF *buf = &dbproc->row_buf;
437 :
438 736 : buffer_free(buf);
439 :
440 736 : memset(buf, 0, sizeof(DBPROC_ROWBUF));
441 :
442 736 : if (0 == nrows) {
443 688 : buf->capacity = 1;
444 688 : BUFFER_CHECK(buf);
445 688 : return;
446 : }
447 :
448 48 : assert(0 < nrows);
449 :
450 48 : buf->capacity = nrows;
451 48 : BUFFER_CHECK(buf);
452 : }
453 :
454 : /*
455 : * Called only by dbresults(); capacity must be >= 1.
456 : * Sybase's documents say dbresults() cannot return FAIL if the prior calls worked,
457 : * which is a little strange, because (for FreeTDS, at least), dbresults
458 : * is when we learn about the result set's width. Without that information, we
459 : * can't allocate memory for the buffer. But if we *fail* to allocate memory,
460 : * we're not to communicate it back to the caller?
461 : */
462 : static void
463 9940 : buffer_alloc(DBPROCESS *dbproc)
464 : {
465 9940 : DBPROC_ROWBUF *buf = &dbproc->row_buf;
466 :
467 : /* Call this function only after setting capacity. */
468 :
469 9940 : assert(buf);
470 9940 : assert(buf->capacity > 0);
471 9940 : assert(buf->rows == NULL);
472 :
473 9940 : buf->rows = tds_new0(DBLIB_BUFFER_ROW, buf->capacity);
474 :
475 9940 : assert(buf->rows);
476 :
477 9940 : buffer_reset(buf);
478 :
479 9940 : buf->received = 0;
480 9940 : }
481 :
482 : /**
483 : * Called by dbnextrow
484 : * Returns a row buffer index, or -1 to indicate the buffer is full.
485 : */
486 : static int
487 166033 : buffer_add_row(DBPROCESS *dbproc, TDSRESULTINFO *resinfo)
488 : {
489 166033 : DBPROC_ROWBUF *buf = &dbproc->row_buf;
490 : DBLIB_BUFFER_ROW *row;
491 : int i;
492 :
493 166033 : assert(buf->capacity >= 0);
494 :
495 166033 : if (buffer_is_full(buf))
496 : return -1;
497 :
498 166033 : row = buffer_row_address(buf, buf->head);
499 :
500 : /* bump the row number, write it, and move the data to head */
501 166033 : if (row->resinfo) {
502 154285 : tds_free_row(row->resinfo, row->row_data);
503 154285 : tds_free_results(row->resinfo);
504 : }
505 166033 : row->row = ++buf->received;
506 166033 : ++resinfo->ref_count;
507 166033 : row->resinfo = resinfo;
508 166033 : row->row_data = NULL;
509 166033 : if (row->sizes)
510 154285 : free(row->sizes);
511 166033 : row->sizes = tds_new0(TDS_INT, resinfo->num_cols);
512 497866 : for (i = 0; i < resinfo->num_cols; ++i)
513 331833 : row->sizes[i] = resinfo->columns[i]->column_cur_size;
514 :
515 : /* initial condition is head == 0 and tail == capacity */
516 166033 : if (buf->tail == buf->capacity) {
517 : /* bumping this tail will set it to zero */
518 9612 : assert(buf->head == 0);
519 9612 : buf->tail = 0;
520 : }
521 :
522 : /* update current, bump the head */
523 166033 : buf->current = buf->head;
524 332066 : buf->head = buffer_idx_increment(buf, buf->head);
525 :
526 166033 : return buf->current;
527 : }
528 :
529 : /**
530 : * Save current row into row buffer
531 : */
532 : static RETCODE
533 175647 : buffer_save_row(DBPROCESS *dbproc)
534 : {
535 175647 : DBPROC_ROWBUF *buf = &dbproc->row_buf;
536 : DBLIB_BUFFER_ROW *row;
537 175647 : int idx = buf->head - 1;
538 :
539 175647 : if (buf->capacity <= 1)
540 : return SUCCEED;
541 :
542 2240 : if (idx < 0)
543 152 : idx = buf->capacity - 1;
544 2240 : if (idx >= 0 && idx < buf->capacity) {
545 2240 : row = &buf->rows[idx];
546 :
547 2240 : if (row->resinfo && !row->row_data) {
548 2176 : row->row_data = row->resinfo->current_row;
549 2176 : tds_alloc_row(row->resinfo);
550 : }
551 : }
552 :
553 : return SUCCEED;
554 : }
555 :
|