Line data Source code
1 : /* FreeTDS - Library of routines accessing Sybase and Microsoft databases
2 : * Copyright (C) 2008-2010 Frediano Ziglio
3 : *
4 : * This library is free software; you can redistribute it and/or
5 : * modify it under the terms of the GNU Library General Public
6 : * License as published by the Free Software Foundation; either
7 : * version 2 of the License, or (at your option) any later version.
8 : *
9 : * This library is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 : * Library General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU Library General Public
15 : * License along with this library; if not, write to the
16 : * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 : * Boston, MA 02111-1307, USA.
18 : */
19 :
20 : /**
21 : * \file
22 : * \brief Handle bulk copy
23 : */
24 :
25 : #include <config.h>
26 :
27 : #if HAVE_STRING_H
28 : #include <string.h>
29 : #endif /* HAVE_STRING_H */
30 :
31 : #if HAVE_ERRNO_H
32 : #include <errno.h>
33 : #endif /* HAVE_ERRNO_H */
34 :
35 : #if HAVE_STDLIB_H
36 : #include <stdlib.h>
37 : #endif /* HAVE_STDLIB_H */
38 :
39 : #include <assert.h>
40 :
41 : #include <freetds/tds.h>
42 : #include <freetds/checks.h>
43 : #include <freetds/bytes.h>
44 : #include <freetds/iconv.h>
45 : #include <freetds/stream.h>
46 : #include <freetds/convert.h>
47 : #include <freetds/utils/string.h>
48 : #include <freetds/replacements.h>
49 : #include <freetds/enum_cap.h>
50 :
51 : /**
52 : * Holds clause buffer
53 : */
54 : typedef struct tds_pbcb
55 : {
56 : /** buffer */
57 : char *pb;
58 : /** buffer length */
59 : unsigned int cb;
60 : /** true is buffer came from malloc */
61 : bool from_malloc;
62 : } TDSPBCB;
63 :
64 : static TDSRET tds7_bcp_send_colmetadata(TDSSOCKET *tds, TDSBCPINFO *bcpinfo);
65 : static TDSRET tds_bcp_start_insert_stmt(TDSSOCKET *tds, TDSBCPINFO *bcpinfo);
66 : static int tds5_bcp_add_fixed_columns(TDSBCPINFO *bcpinfo, tds_bcp_get_col_data get_col_data, tds_bcp_null_error null_error,
67 : int offset, unsigned char * rowbuffer, int start);
68 : static int tds5_bcp_add_variable_columns(TDSBCPINFO *bcpinfo, tds_bcp_get_col_data get_col_data, tds_bcp_null_error null_error,
69 : int offset, TDS_UCHAR *rowbuffer, int start, int *pncols);
70 : static void tds_bcp_row_free(TDSRESULTINFO* result, unsigned char *row);
71 : static TDSRET tds5_process_insert_bulk_reply(TDSSOCKET * tds, TDSBCPINFO *bcpinfo);
72 : static TDSRET probe_sap_locking(TDSSOCKET *tds, TDSBCPINFO *bcpinfo);
73 :
74 : /**
75 : * Initialize BCP information.
76 : * Query structure of the table to server.
77 : * \tds
78 : * \param bcpinfo BCP information to initialize. Structure should be allocate
79 : * and table name and direction should be already set.
80 : */
81 : TDSRET
82 474 : tds_bcp_init(TDSSOCKET *tds, TDSBCPINFO *bcpinfo)
83 : {
84 : TDSRESULTINFO *resinfo;
85 474 : TDSRESULTINFO *bindinfo = NULL;
86 : TDSCOLUMN *curcol;
87 : TDS_INT result_type;
88 : int i;
89 : TDSRET rc;
90 : const char *fmt;
91 :
92 : /* FIXME don't leave state in processing state */
93 :
94 : /* Check table locking type. Do this first, because the code in bcp_init() which
95 : * calls us seems to depend on the information from the columns query still being
96 : * active in the context.
97 : */
98 474 : probe_sap_locking(tds, bcpinfo);
99 :
100 : /* TODO quote tablename if needed */
101 474 : if (bcpinfo->direction != TDS_BCP_QUERYOUT)
102 : fmt = "SET FMTONLY ON select * from %s SET FMTONLY OFF";
103 : else
104 10 : fmt = "SET FMTONLY ON %s SET FMTONLY OFF";
105 :
106 948 : if (TDS_FAILED(rc=tds_submit_queryf(tds, fmt, tds_dstr_cstr(&bcpinfo->tablename))))
107 : /* TODO return an error ?? */
108 : /* Attempt to use Bulk Copy with a non-existent Server table (might be why ...) */
109 : return rc;
110 :
111 : /* TODO possibly stop at ROWFMT and copy before going to idle */
112 : /* TODO check what happen if table is not present, cleanup on error */
113 2370 : while ((rc = tds_process_tokens(tds, &result_type, NULL, TDS_TOKEN_RESULTS))
114 : == TDS_SUCCESS)
115 1896 : continue;
116 474 : TDS_PROPAGATE(rc);
117 :
118 : /* copy the results info from the TDS socket */
119 474 : if (!tds->res_info)
120 : return TDS_FAIL;
121 :
122 474 : resinfo = tds->res_info;
123 474 : if ((bindinfo = tds_alloc_results(resinfo->num_cols)) == NULL) {
124 : rc = TDS_FAIL;
125 : goto cleanup;
126 : }
127 :
128 474 : bindinfo->row_size = resinfo->row_size;
129 :
130 : /* Copy the column metadata */
131 474 : rc = TDS_FAIL;
132 3652 : for (i = 0; i < bindinfo->num_cols; i++) {
133 :
134 3178 : curcol = bindinfo->columns[i];
135 :
136 : /*
137 : * TODO use memcpy ??
138 : * curcol and resinfo->columns[i] are both TDSCOLUMN.
139 : * Why not "curcol = resinfo->columns[i];"? Because the rest of TDSCOLUMN (below column_timestamp)
140 : * isn't being used. Perhaps this "upper" part of TDSCOLUMN should be a substructure.
141 : * Or, see if the "lower" part is unused (and zeroed out) at this point, and just do one assignment.
142 : */
143 3178 : curcol->funcs = resinfo->columns[i]->funcs;
144 3178 : curcol->column_type = resinfo->columns[i]->column_type;
145 3178 : curcol->column_usertype = resinfo->columns[i]->column_usertype;
146 3178 : curcol->column_flags = resinfo->columns[i]->column_flags;
147 3178 : if (curcol->column_varint_size == 0)
148 3178 : curcol->column_cur_size = resinfo->columns[i]->column_cur_size;
149 : else
150 0 : curcol->column_cur_size = -1;
151 3178 : curcol->column_size = resinfo->columns[i]->column_size;
152 3178 : curcol->column_varint_size = resinfo->columns[i]->column_varint_size;
153 3178 : curcol->column_prec = resinfo->columns[i]->column_prec;
154 3178 : curcol->column_scale = resinfo->columns[i]->column_scale;
155 3178 : curcol->on_server = resinfo->columns[i]->on_server;
156 3178 : curcol->char_conv = resinfo->columns[i]->char_conv;
157 3178 : if (!tds_dstr_dup(&curcol->column_name, &resinfo->columns[i]->column_name))
158 : goto cleanup;
159 3178 : if (!tds_dstr_dup(&curcol->table_column_name, &resinfo->columns[i]->table_column_name))
160 : goto cleanup;
161 3178 : curcol->column_nullable = resinfo->columns[i]->column_nullable;
162 3178 : curcol->column_identity = resinfo->columns[i]->column_identity;
163 3178 : curcol->column_timestamp = resinfo->columns[i]->column_timestamp;
164 3178 : curcol->column_computed = resinfo->columns[i]->column_computed;
165 :
166 3178 : memcpy(curcol->column_collation, resinfo->columns[i]->column_collation, 5);
167 :
168 : /* From MS documentation:
169 : * Note that for INSERT BULK operations, XMLTYPE is to be sent as NVARCHAR(N) or NVARCHAR(MAX)
170 : * data type. An error is produced if XMLTYPE is specified.
171 : */
172 3178 : if (curcol->on_server.column_type == SYBMSXML) {
173 8 : curcol->on_server.column_type = XSYBNVARCHAR;
174 8 : curcol->column_type = SYBVARCHAR;
175 8 : memcpy(curcol->column_collation, tds->conn->collation, 5);
176 : }
177 :
178 3178 : if (is_numeric_type(curcol->column_type)) {
179 232 : curcol->bcp_column_data = tds_alloc_bcp_column_data(sizeof(TDS_NUMERIC));
180 232 : ((TDS_NUMERIC *) curcol->bcp_column_data->data)->precision = curcol->column_prec;
181 232 : ((TDS_NUMERIC *) curcol->bcp_column_data->data)->scale = curcol->column_scale;
182 : } else {
183 2946 : curcol->bcp_column_data =
184 2946 : tds_alloc_bcp_column_data(TDS_MAX(curcol->column_size,curcol->on_server.column_size));
185 : }
186 3178 : if (!curcol->bcp_column_data)
187 : goto cleanup;
188 : }
189 :
190 474 : if (!IS_TDS7_PLUS(tds->conn)) {
191 78 : bindinfo->current_row = tds_new(unsigned char, bindinfo->row_size);
192 78 : if (!bindinfo->current_row)
193 : goto cleanup;
194 78 : bindinfo->row_free = tds_bcp_row_free;
195 : }
196 :
197 474 : if (bcpinfo->identity_insert_on) {
198 :
199 0 : rc = tds_submit_queryf(tds, "set identity_insert %s on", tds_dstr_cstr(&bcpinfo->tablename));
200 0 : if (TDS_FAILED(rc))
201 : goto cleanup;
202 :
203 : /* TODO use tds_process_simple_query */
204 0 : while ((rc = tds_process_tokens(tds, &result_type, NULL, TDS_TOKEN_RESULTS))
205 : == TDS_SUCCESS) {
206 : }
207 0 : if (rc != TDS_NO_MORE_RESULTS)
208 : goto cleanup;
209 : }
210 :
211 474 : bcpinfo->bindinfo = bindinfo;
212 474 : bcpinfo->bind_count = 0;
213 :
214 474 : return TDS_SUCCESS;
215 :
216 0 : cleanup:
217 0 : tds_free_results(bindinfo);
218 0 : return rc;
219 : }
220 :
221 : /**
222 : * Detect if table we're writing to uses 'datarows' lockmode.
223 : * \tds
224 : * \param bcpinfo BCP information already prepared
225 : * \return TDS_SUCCESS or TDS_FAIL.
226 : */
227 : static TDSRET
228 474 : probe_sap_locking(TDSSOCKET *tds, TDSBCPINFO *bcpinfo)
229 : {
230 : TDSRET rc;
231 : unsigned int value;
232 : bool value_found;
233 : TDS_INT resulttype;
234 : const char *full_tablename, *rdot, *tablename;
235 :
236 : /* Only needed for inward data */
237 474 : if (bcpinfo->direction != TDS_BCP_IN)
238 : return TDS_SUCCESS;
239 :
240 : /* Only needed for SAP ASE versions which support datarows-locking. */
241 368 : if (!TDS_IS_SYBASE(tds) || !tds_capability_has_req(tds->conn, TDS_REQ_DOL_BULK))
242 : return TDS_SUCCESS;
243 :
244 : /* A request to probe database.owner.tablename needs to check database.owner.sysobjects for tablename
245 : * (it doesn't work to check sysobjects for database.owner.tablename) */
246 108 : full_tablename = tds_dstr_cstr(&bcpinfo->tablename);
247 54 : rdot = strrchr(full_tablename, '.');
248 :
249 54 : if (rdot != NULL)
250 2 : tablename = rdot + 1;
251 : else
252 : tablename = full_tablename;
253 :
254 54 : TDS_PROPAGATE(tds_submit_queryf(tds, "select sysstat2 from %.*ssysobjects where type='U' and name='%s'",
255 : (int) (rdot ? (rdot - full_tablename + 1) : 0), full_tablename, tablename));
256 :
257 : value = 0;
258 : value_found = false;
259 :
260 162 : while ((rc = tds_process_tokens(tds, &resulttype, NULL, TDS_TOKEN_RESULTS)) == TDS_SUCCESS) {
261 108 : const unsigned int stop_mask = TDS_RETURN_DONE | TDS_RETURN_ROW;
262 :
263 108 : if (resulttype != TDS_ROW_RESULT)
264 74 : continue;
265 :
266 : /* We must keep processing result tokens (even if we've found what we're looking for) so that the
267 : * stream is ready for subsequent queries. */
268 68 : while ((rc = tds_process_tokens(tds, &resulttype, NULL, stop_mask)) == TDS_SUCCESS) {
269 : TDSCOLUMN *col;
270 : TDS_SERVER_TYPE ctype;
271 : CONV_RESULT dres;
272 : TDS_INT res;
273 :
274 68 : if (resulttype != TDS_ROW_RESULT)
275 : break;
276 :
277 : /* Get INT4 from column 0 */
278 34 : if (!tds->current_results || tds->current_results->num_cols < 1)
279 0 : continue;
280 :
281 34 : col = tds->current_results->columns[0];
282 34 : if (col->column_cur_size < 0)
283 0 : continue;
284 :
285 34 : ctype = tds_get_conversion_type(col->column_type, col->column_size);
286 34 : res = tds_convert(tds_get_ctx(tds), ctype, col->column_data, col->column_cur_size, SYBINT4, &dres);
287 34 : if (res < 0)
288 0 : continue;
289 :
290 34 : value = dres.i;
291 34 : value_found = true;
292 : }
293 : }
294 54 : TDS_PROPAGATE(rc);
295 :
296 : /* No valid result - Treat this as meaning the feature is lacking; it could be an old Sybase version for example */
297 54 : if (!value_found) {
298 20 : tdsdump_log(TDS_DBG_INFO1, "[DOL BULK] No valid result returned by probe.\n");
299 : return TDS_SUCCESS;
300 : }
301 :
302 : /* Log and analyze result.
303 : * Sysstat2 flag values:
304 : * https://infocenter.sybase.com/help/index.jsp?topic=/com.sybase.infocenter.help.ase.16.0/doc/html/title.html
305 : * 0x08000 - datarows locked
306 : * 0x20000 - clustered index present. (Not recommended for performance reasons to datarows-lock with clustered index) */
307 34 : tdsdump_log(TDS_DBG_INFO1, "%x = sysstat2 for '%s'", value, full_tablename);
308 :
309 34 : if (0x8000 & value) {
310 2 : bcpinfo->datarows_locking = true;
311 2 : tdsdump_log(TDS_DBG_INFO1, "Table has datarows-locking; enabling DOL BULK format.\n");
312 :
313 2 : if (0x20000 & value)
314 0 : tdsdump_log(TDS_DBG_WARN, "Table also has clustered index: bulk insert performance may be degraded.\n");
315 : }
316 : return TDS_SUCCESS;
317 : }
318 :
319 : /**
320 : * Help to build query to be sent to server.
321 : * Append column declaration to the query.
322 : * Only for TDS 7.0+.
323 : * \tds
324 : * \param[out] clause output string
325 : * \param bcpcol column to append
326 : * \param first true if column is the first
327 : * \return TDS_SUCCESS or TDS_FAIL.
328 : */
329 : static TDSRET
330 1696 : tds7_build_bulk_insert_stmt(TDSSOCKET * tds, TDSPBCB * clause, TDSCOLUMN * bcpcol, int first)
331 : {
332 : char column_type[40];
333 :
334 1696 : tdsdump_log(TDS_DBG_FUNC, "tds7_build_bulk_insert_stmt(%p, %p, %p, %d)\n", tds, clause, bcpcol, first);
335 :
336 1696 : if (TDS_FAILED(tds_get_column_declaration(tds, bcpcol, column_type))) {
337 0 : tdserror(tds_get_ctx(tds), tds, TDSEBPROBADTYP, errno);
338 0 : tdsdump_log(TDS_DBG_FUNC, "error: cannot build bulk insert statement. unrecognized server datatype %d\n",
339 0 : bcpcol->on_server.column_type);
340 : return TDS_FAIL;
341 : }
342 :
343 3392 : if (clause->cb < strlen(clause->pb)
344 5088 : + tds_quote_id(tds, NULL, tds_dstr_cstr(&bcpcol->column_name), tds_dstr_len(&bcpcol->column_name))
345 1696 : + strlen(column_type)
346 1696 : + ((first) ? 2u : 4u)) {
347 0 : char *temp = tds_new(char, 2 * clause->cb);
348 :
349 0 : if (!temp) {
350 0 : tdserror(tds_get_ctx(tds), tds, TDSEMEM, errno);
351 0 : return TDS_FAIL;
352 : }
353 0 : strcpy(temp, clause->pb);
354 0 : if (clause->from_malloc)
355 0 : free(clause->pb);
356 0 : clause->from_malloc = true;
357 0 : clause->pb = temp;
358 0 : clause->cb *= 2;
359 : }
360 :
361 1696 : if (!first)
362 1436 : strcat(clause->pb, ", ");
363 :
364 5088 : tds_quote_id(tds, strchr(clause->pb, 0), tds_dstr_cstr(&bcpcol->column_name), tds_dstr_len(&bcpcol->column_name));
365 1696 : strcat(clause->pb, " ");
366 1696 : strcat(clause->pb, column_type);
367 :
368 1696 : return TDS_SUCCESS;
369 : }
370 :
371 : /**
372 : * Prepare the query to be sent to server to request BCP information
373 : * \tds
374 : * \param bcpinfo BCP information
375 : */
376 : static TDSRET
377 314 : tds_bcp_start_insert_stmt(TDSSOCKET * tds, TDSBCPINFO * bcpinfo)
378 : {
379 : char *query;
380 :
381 314 : if (IS_TDS7_PLUS(tds->conn)) {
382 : int i, firstcol, erc;
383 : char *hint;
384 : TDSCOLUMN *bcpcol;
385 : TDSPBCB colclause;
386 260 : char clause_buffer[4096] = { 0 };
387 :
388 260 : colclause.pb = clause_buffer;
389 260 : colclause.cb = sizeof(clause_buffer);
390 260 : colclause.from_malloc = false;
391 :
392 : /* TODO avoid asprintf, use always malloc-ed buffer */
393 260 : firstcol = 1;
394 :
395 1956 : for (i = 0; i < bcpinfo->bindinfo->num_cols; i++) {
396 1696 : bcpcol = bcpinfo->bindinfo->columns[i];
397 :
398 1696 : if (bcpcol->column_timestamp)
399 0 : continue;
400 1696 : if (!bcpinfo->identity_insert_on && bcpcol->column_identity)
401 0 : continue;
402 1696 : if (bcpcol->column_computed)
403 0 : continue;
404 1696 : tds7_build_bulk_insert_stmt(tds, &colclause, bcpcol, firstcol);
405 1696 : firstcol = 0;
406 : }
407 :
408 520 : if (!tds_dstr_isempty(&bcpinfo->hint)) {
409 168 : if (asprintf(&hint, " with (%s)", tds_dstr_cstr(&bcpinfo->hint)) < 0)
410 0 : hint = NULL;
411 : } else {
412 176 : hint = strdup("");
413 : }
414 260 : if (!hint) {
415 0 : if (colclause.from_malloc)
416 0 : TDS_ZERO_FREE(colclause.pb);
417 0 : return TDS_FAIL;
418 : }
419 :
420 520 : erc = asprintf(&query, "insert bulk %s (%s)%s", tds_dstr_cstr(&bcpinfo->tablename), colclause.pb, hint);
421 :
422 260 : free(hint);
423 260 : if (colclause.from_malloc)
424 0 : TDS_ZERO_FREE(colclause.pb); /* just for good measure; not used beyond this point */
425 :
426 260 : if (erc < 0)
427 : return TDS_FAIL;
428 : } else {
429 : /* NOTE: if we use "with nodescribe" for following inserts server do not send describe */
430 108 : if (asprintf(&query, "insert bulk %s", tds_dstr_cstr(&bcpinfo->tablename)) < 0)
431 : return TDS_FAIL;
432 : }
433 :
434 : /* save the statement for later... */
435 314 : bcpinfo->insert_stmt = query;
436 :
437 314 : return TDS_SUCCESS;
438 : }
439 :
440 : static TDSRET
441 804 : tds7_send_record(TDSSOCKET *tds, TDSBCPINFO *bcpinfo,
442 : tds_bcp_get_col_data get_col_data, tds_bcp_null_error null_error, int offset)
443 : {
444 : int i;
445 :
446 804 : tds_put_byte(tds, TDS_ROW_TOKEN); /* 0xd1 */
447 12436 : for (i = 0; i < bcpinfo->bindinfo->num_cols; i++) {
448 :
449 : TDS_INT save_size;
450 : unsigned char *save_data;
451 : TDSBLOB blob;
452 : TDSCOLUMN *bindcol;
453 : TDSRET rc;
454 :
455 11632 : bindcol = bcpinfo->bindinfo->columns[i];
456 :
457 : /*
458 : * Don't send the (meta)data for timestamp columns or
459 : * identity columns unless indentity_insert is enabled.
460 : */
461 :
462 11632 : if ((!bcpinfo->identity_insert_on && bindcol->column_identity) ||
463 11632 : bindcol->column_timestamp ||
464 : bindcol->column_computed) {
465 0 : continue;
466 : }
467 :
468 11632 : rc = get_col_data(bcpinfo, bindcol, offset);
469 11632 : if (TDS_FAILED(rc)) {
470 0 : tdsdump_log(TDS_DBG_INFO1, "get_col_data (column %d) failed\n", i + 1);
471 0 : return rc;
472 : }
473 11632 : tdsdump_log(TDS_DBG_INFO1, "gotten column %d length %d null %d\n",
474 0 : i + 1, bindcol->bcp_column_data->datalen, bindcol->bcp_column_data->is_null);
475 :
476 11632 : save_size = bindcol->column_cur_size;
477 11632 : save_data = bindcol->column_data;
478 11632 : assert(bindcol->column_data == NULL);
479 11632 : if (bindcol->bcp_column_data->is_null) {
480 4488 : if (!bindcol->column_nullable && !is_nullable_type(bindcol->on_server.column_type)) {
481 0 : if (null_error)
482 0 : null_error(bcpinfo, i, offset);
483 : return TDS_FAIL;
484 : }
485 4488 : bindcol->column_cur_size = -1;
486 7144 : } else if (is_blob_col(bindcol)) {
487 92 : bindcol->column_cur_size = bindcol->bcp_column_data->datalen;
488 92 : memset(&blob, 0, sizeof(blob));
489 92 : blob.textvalue = (TDS_CHAR *) bindcol->bcp_column_data->data;
490 92 : bindcol->column_data = (unsigned char *) &blob;
491 : } else {
492 7052 : bindcol->column_cur_size = bindcol->bcp_column_data->datalen;
493 7052 : bindcol->column_data = bindcol->bcp_column_data->data;
494 : }
495 11632 : rc = bindcol->funcs->put_data(tds, bindcol, 1);
496 11632 : bindcol->column_cur_size = save_size;
497 11632 : bindcol->column_data = save_data;
498 :
499 11632 : TDS_PROPAGATE(rc);
500 : }
501 : return TDS_SUCCESS;
502 : }
503 :
504 : static TDSRET
505 194 : tds5_send_record(TDSSOCKET *tds, TDSBCPINFO *bcpinfo,
506 : tds_bcp_get_col_data get_col_data, tds_bcp_null_error null_error, int offset)
507 : {
508 194 : int row_pos = 0;
509 : int row_sz_pos;
510 194 : int blob_cols = 0;
511 : int var_cols_pos;
512 194 : int var_cols_written = 0;
513 194 : TDS_INT old_record_size = bcpinfo->bindinfo->row_size;
514 194 : unsigned char *record = bcpinfo->bindinfo->current_row;
515 : int i;
516 :
517 194 : memset(record, '\0', old_record_size); /* zero the rowbuffer */
518 :
519 : /* SAP ASE Datarows-locked tables expect additional 4 blank bytes before everything else */
520 194 : if (bcpinfo->datarows_locking)
521 20 : row_pos += 4;
522 :
523 : /*
524 : * offset 0 = number of var columns
525 : * offset 1 = row number. zeroed (datasever assigns)
526 : */
527 194 : var_cols_pos = row_pos;
528 194 : row_pos += 2;
529 :
530 194 : if ((row_pos = tds5_bcp_add_fixed_columns(bcpinfo, get_col_data, null_error, offset, record, row_pos)) < 0)
531 : return TDS_FAIL;
532 :
533 192 : row_sz_pos = row_pos;
534 :
535 : /* potential variable columns to write */
536 :
537 192 : row_pos = tds5_bcp_add_variable_columns(bcpinfo, get_col_data, null_error, offset, record, row_pos, &var_cols_written);
538 192 : if (row_pos < 0)
539 : return TDS_FAIL;
540 :
541 :
542 192 : if (var_cols_written) {
543 174 : TDS_PUT_UA2LE(&record[row_sz_pos], row_pos);
544 174 : record[var_cols_pos] = var_cols_written;
545 : }
546 :
547 192 : tdsdump_log(TDS_DBG_INFO1, "old_record_size = %d new size = %d \n", old_record_size, row_pos);
548 :
549 192 : tds_put_smallint(tds, row_pos);
550 192 : tds_put_n(tds, record, row_pos);
551 :
552 : /* row is done, now handle any text/image data */
553 :
554 192 : blob_cols = 0;
555 :
556 3526 : for (i = 0; i < bcpinfo->bindinfo->num_cols; i++) {
557 3334 : TDSCOLUMN *bindcol = bcpinfo->bindinfo->columns[i];
558 3334 : if (is_blob_type(bindcol->on_server.column_type)) {
559 10 : TDS_PROPAGATE(get_col_data(bcpinfo, bindcol, offset));
560 : /* unknown but zero */
561 10 : tds_put_smallint(tds, 0);
562 10 : TDS_PUT_BYTE(tds, bindcol->on_server.column_type);
563 10 : tds_put_byte(tds, 0xff - blob_cols);
564 : /*
565 : * offset of txptr we stashed during variable
566 : * column processing
567 : */
568 10 : tds_put_smallint(tds, bindcol->column_textpos);
569 10 : tds_put_int(tds, bindcol->bcp_column_data->datalen);
570 10 : tds_put_n(tds, bindcol->bcp_column_data->data, bindcol->bcp_column_data->datalen);
571 10 : blob_cols++;
572 :
573 : }
574 : }
575 : return TDS_SUCCESS;
576 : }
577 :
578 : /**
579 : * Send one row of data to server
580 : * \tds
581 : * \param bcpinfo BCP information
582 : * \param get_col_data function to call to retrieve data to be sent
583 : * \param ignored function to call if we try to send NULL if not allowed (not used)
584 : * \param offset passed to get_col_data and null_error to specify the row to get
585 : * \return TDS_SUCCESS or TDS_FAIL.
586 : */
587 : TDSRET
588 998 : tds_bcp_send_record(TDSSOCKET *tds, TDSBCPINFO *bcpinfo,
589 : tds_bcp_get_col_data get_col_data, tds_bcp_null_error null_error, int offset)
590 : {
591 : TDSRET rc;
592 :
593 998 : tdsdump_log(TDS_DBG_FUNC, "tds_bcp_send_bcp_record(%p, %p, %p, %p, %d)\n",
594 : tds, bcpinfo, get_col_data, null_error, offset);
595 :
596 998 : if (tds->out_flag != TDS_BULK || tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
597 : return TDS_FAIL;
598 :
599 998 : if (IS_TDS7_PLUS(tds->conn))
600 804 : rc = tds7_send_record(tds, bcpinfo, get_col_data, null_error, offset);
601 : else
602 194 : rc = tds5_send_record(tds, bcpinfo, get_col_data, null_error, offset);
603 :
604 998 : tds_set_state(tds, TDS_SENDING);
605 998 : return rc;
606 : }
607 :
608 : static inline void
609 : tds5_swap_data(const TDSCOLUMN *col TDS_UNUSED, void *p TDS_UNUSED)
610 : {
611 : #ifdef WORDS_BIGENDIAN
612 : tds_swap_datatype(tds_get_conversion_type(col->on_server.column_type, col->column_size), p);
613 : #endif
614 : }
615 :
616 : /**
617 : * Add fixed size columns to the row
618 : * \param bcpinfo BCP information
619 : * \param get_col_data function to call to retrieve data to be sent
620 : * \param ignored function to call if we try to send NULL if not allowed (not used)
621 : * \param offset passed to get_col_data and null_error to specify the row to get
622 : * \param rowbuffer row buffer to write to
623 : * \param start row buffer last end position
624 : * \returns new row length or -1 on error.
625 : */
626 : static int
627 194 : tds5_bcp_add_fixed_columns(TDSBCPINFO *bcpinfo, tds_bcp_get_col_data get_col_data, tds_bcp_null_error null_error,
628 : int offset, unsigned char * rowbuffer, int start)
629 : {
630 : TDS_NUMERIC *num;
631 194 : int row_pos = start;
632 : int cpbytes;
633 : int i;
634 194 : int bitleft = 0, bitpos = 0;
635 :
636 194 : assert(bcpinfo);
637 194 : assert(rowbuffer);
638 :
639 194 : tdsdump_log(TDS_DBG_FUNC, "tds5_bcp_add_fixed_columns(%p, %p, %p, %d, %p, %d)\n",
640 : bcpinfo, get_col_data, null_error, offset, rowbuffer, start);
641 :
642 3336 : for (i = 0; i < bcpinfo->bindinfo->num_cols; i++) {
643 :
644 3338 : TDSCOLUMN *const bcpcol = bcpinfo->bindinfo->columns[i];
645 3338 : const TDS_INT column_size = bcpcol->on_server.column_size;
646 :
647 : /* if possible check information from server */
648 3338 : if (bcpinfo->sybase_count > i) {
649 3338 : if (bcpinfo->sybase_colinfo[i].offset < 0)
650 1788 : continue;
651 : } else {
652 0 : if (is_nullable_type(bcpcol->on_server.column_type) || bcpcol->column_nullable)
653 0 : continue;
654 : }
655 :
656 1550 : tdsdump_log(TDS_DBG_FUNC, "tds5_bcp_add_fixed_columns column %d (%s) is a fixed column\n", i + 1, tds_dstr_cstr(&bcpcol->column_name));
657 :
658 1550 : if (TDS_FAILED(get_col_data(bcpinfo, bcpcol, offset))) {
659 0 : tdsdump_log(TDS_DBG_INFO1, "get_col_data (column %d) failed\n", i + 1);
660 : return -1;
661 : }
662 :
663 : /* We have no way to send a NULL at this point, return error to client */
664 1550 : if (bcpcol->bcp_column_data->is_null) {
665 2 : tdsdump_log(TDS_DBG_ERROR, "tds5_bcp_add_fixed_columns column %d is a null column\n", i + 1);
666 : /* No value or default value available and NULL not allowed. */
667 2 : if (null_error)
668 2 : null_error(bcpinfo, i, offset);
669 : return -1;
670 : }
671 :
672 1548 : if (is_numeric_type(bcpcol->on_server.column_type)) {
673 220 : num = (TDS_NUMERIC *) bcpcol->bcp_column_data->data;
674 220 : cpbytes = tds_numeric_bytes_per_prec[num->precision];
675 220 : memcpy(&rowbuffer[row_pos], num->array, cpbytes);
676 1328 : } else if (bcpcol->column_type == SYBBIT) {
677 : /* all bit are collapsed together */
678 178 : if (!bitleft) {
679 126 : bitpos = row_pos++;
680 126 : bitleft = 8;
681 126 : rowbuffer[bitpos] = 0;
682 : }
683 178 : if (bcpcol->bcp_column_data->data[0])
684 146 : rowbuffer[bitpos] |= 256 >> bitleft;
685 178 : --bitleft;
686 178 : continue;
687 : } else {
688 1150 : cpbytes = bcpcol->bcp_column_data->datalen > column_size ?
689 : column_size : bcpcol->bcp_column_data->datalen;
690 1150 : memcpy(&rowbuffer[row_pos], bcpcol->bcp_column_data->data, cpbytes);
691 1150 : tds5_swap_data(bcpcol, &rowbuffer[row_pos]);
692 :
693 : /* CHAR data may need padding out to the database length with blanks */
694 : /* TODO check binary !!! */
695 1150 : if (bcpcol->column_type == SYBCHAR && cpbytes < column_size)
696 118 : memset(rowbuffer + row_pos + cpbytes, ' ', column_size - cpbytes);
697 : }
698 :
699 1370 : row_pos += column_size;
700 : }
701 : return row_pos;
702 : }
703 :
704 : /**
705 : * Add variable size columns to the row
706 : *
707 : * \param bcpinfo BCP information already prepared
708 : * \param get_col_data function to call to retrieve data to be sent
709 : * \param null_error function to call if we try to send NULL if not allowed
710 : * \param offset passed to get_col_data and null_error to specify the row to get
711 : * \param rowbuffer The row image that will be sent to the server.
712 : * \param start Where to begin copying data into the rowbuffer.
713 : * \param pncols Address of output variable holding the count of columns added to the rowbuffer.
714 : *
715 : * \return length of (potentially modified) rowbuffer, or -1.
716 : */
717 : static int
718 192 : tds5_bcp_add_variable_columns(TDSBCPINFO *bcpinfo, tds_bcp_get_col_data get_col_data, tds_bcp_null_error null_error,
719 : int offset, TDS_UCHAR* rowbuffer, int start, int *pncols)
720 : {
721 : TDS_USMALLINT offsets[256];
722 : unsigned int i, row_pos;
723 192 : unsigned int ncols = 0;
724 :
725 192 : assert(bcpinfo);
726 192 : assert(rowbuffer);
727 192 : assert(pncols);
728 :
729 192 : tdsdump_log(TDS_DBG_FUNC, "%4s %8s %18s %18s %8s\n", "col",
730 : "type",
731 : "is_nullable_type",
732 : "column_nullable",
733 : "is null" );
734 3334 : for (i = 0; i < bcpinfo->bindinfo->num_cols; i++) {
735 3334 : TDSCOLUMN *bcpcol = bcpinfo->bindinfo->columns[i];
736 3334 : tdsdump_log(TDS_DBG_FUNC, "%4d %8d %18s %18s %8s\n", i,
737 : bcpcol->on_server.column_type,
738 0 : is_nullable_type(bcpcol->on_server.column_type)? "yes" : "no",
739 0 : bcpcol->column_nullable? "yes" : "no",
740 0 : bcpcol->bcp_column_data->is_null? "yes" : "no" );
741 : }
742 :
743 : /* the first two bytes of the rowbuffer are reserved to hold the entire record length */
744 192 : row_pos = start + 2;
745 192 : offsets[0] = row_pos;
746 :
747 192 : tdsdump_log(TDS_DBG_FUNC, "%4s %8s %8s %8s\n", "col", "ncols", "row_pos", "cpbytes");
748 :
749 3334 : for (i = 0; i < bcpinfo->bindinfo->num_cols; i++) {
750 3334 : unsigned int cpbytes = 0;
751 3334 : TDSCOLUMN *bcpcol = bcpinfo->bindinfo->columns[i];
752 :
753 : /*
754 : * Is this column of "variable" type, i.e. NULLable
755 : * or naturally variable length e.g. VARCHAR
756 : */
757 3334 : if (bcpinfo->sybase_count > (TDS_INT) i) {
758 3334 : if (bcpinfo->sybase_colinfo[i].offset >= 0)
759 1546 : continue;
760 : } else {
761 0 : if (!is_nullable_type(bcpcol->on_server.column_type) && !bcpcol->column_nullable)
762 0 : continue;
763 : }
764 :
765 1788 : tdsdump_log(TDS_DBG_FUNC, "%4d %8d %8d %8d\n", i, ncols, row_pos, cpbytes);
766 :
767 1788 : if (TDS_FAILED(get_col_data(bcpinfo, bcpcol, offset)))
768 : return -1;
769 :
770 : /* If it's a NOT NULL column, and we have no data, throw an error.
771 : * This is the behavior for Sybase, this function is only used for Sybase */
772 1788 : if (!bcpcol->column_nullable && bcpcol->bcp_column_data->is_null) {
773 : /* No value or default value available and NULL not allowed. */
774 0 : if (null_error)
775 0 : null_error(bcpinfo, i, offset);
776 : return -1;
777 : }
778 :
779 : /* move the column buffer into the rowbuffer */
780 1788 : if (!bcpcol->bcp_column_data->is_null) {
781 446 : if (is_blob_type(bcpcol->on_server.column_type)) {
782 10 : cpbytes = 16;
783 10 : bcpcol->column_textpos = row_pos; /* save for data write */
784 436 : } else if (is_numeric_type(bcpcol->on_server.column_type)) {
785 24 : TDS_NUMERIC *num = (TDS_NUMERIC *) bcpcol->bcp_column_data->data;
786 24 : cpbytes = tds_numeric_bytes_per_prec[num->precision];
787 24 : memcpy(&rowbuffer[row_pos], num->array, cpbytes);
788 : } else {
789 824 : cpbytes = bcpcol->bcp_column_data->datalen > bcpcol->column_size ?
790 412 : bcpcol->column_size : bcpcol->bcp_column_data->datalen;
791 412 : memcpy(&rowbuffer[row_pos], bcpcol->bcp_column_data->data, cpbytes);
792 412 : tds5_swap_data(bcpcol, &rowbuffer[row_pos]);
793 : }
794 : }
795 :
796 1788 : row_pos += cpbytes;
797 1788 : offsets[++ncols] = row_pos;
798 1788 : tdsdump_dump_buf(TDS_DBG_NETWORK, "BCP row buffer so far", rowbuffer, row_pos);
799 : }
800 :
801 192 : tdsdump_log(TDS_DBG_FUNC, "%4d %8d %8d\n", i, ncols, row_pos);
802 :
803 : /*
804 : * The rowbuffer ends with an offset table and, optionally, an adjustment table.
805 : * The offset table has 1-byte elements that describe the locations of the start of each column in
806 : * the rowbuffer. If the largest offset is greater than 255, another table -- the adjustment table --
807 : * is inserted just before the offset table. It holds the high bytes.
808 : *
809 : * Both tables are laid out in reverse:
810 : * #elements, offset N+1, offset N, offset N-1, ... offset 0
811 : * E.g. for 2 columns you have 4 data points:
812 : * 1. How many elements (4)
813 : * 2. Start of column 3 (non-existent, "one off the end")
814 : * 3. Start of column 2
815 : * 4. Start of column 1
816 : * The length of each column is computed by subtracting its start from the its successor's start.
817 : *
818 : * The algorithm below computes both tables. If the adjustment table isn't needed, the
819 : * effect is to overwrite it with the offset table.
820 : */
821 1534 : while (ncols && offsets[ncols] == offsets[ncols-1])
822 : ncols--; /* trailing NULL columns are not sent and are not included in the offset table */
823 :
824 346 : if (ncols && !bcpinfo->datarows_locking) {
825 154 : TDS_UCHAR *poff = rowbuffer + row_pos;
826 154 : unsigned int pfx_top = offsets[ncols] >> 8;
827 :
828 154 : tdsdump_log(TDS_DBG_FUNC, "ncols=%u poff=%p [%u]\n", ncols, poff, offsets[ncols]);
829 :
830 154 : *poff++ = ncols + 1;
831 : /* this is some kind of run-length-prefix encoding */
832 314 : while (pfx_top) {
833 : unsigned int n_pfx = 1;
834 :
835 126 : for (i = 0; i <= ncols ; ++i)
836 126 : if ((offsets[i] >> 8) < pfx_top)
837 80 : ++n_pfx;
838 6 : *poff++ = n_pfx;
839 6 : --pfx_top;
840 : }
841 :
842 154 : tdsdump_log(TDS_DBG_FUNC, "poff=%p\n", poff);
843 :
844 580 : for (i=0; i <= ncols; i++)
845 580 : *poff++ = offsets[ncols-i] & 0xFF;
846 154 : row_pos = (unsigned int)(poff - rowbuffer);
847 : } else { /* Datarows-locking */
848 : unsigned int col;
849 :
850 58 : for (col = ncols; col-- > 0;) {
851 : /* The DOL BULK format has a much simpler row table -- it's just a 2-byte length for every
852 : * non-fixed column (does not have the extra "offset after the end" that the basic format has) */
853 20 : rowbuffer[row_pos++] = offsets[col] % 256;
854 20 : rowbuffer[row_pos++] = offsets[col] / 256;
855 :
856 20 : tdsdump_log(TDS_DBG_FUNC, "[DOL BULK offset table] col=%u offset=%u\n", col, offsets[col]);
857 : }
858 : }
859 :
860 192 : tdsdump_log(TDS_DBG_FUNC, "%4d %8d %8d\n", i, ncols, row_pos);
861 192 : tdsdump_dump_buf(TDS_DBG_NETWORK, "BCP row buffer", rowbuffer, row_pos);
862 :
863 192 : *pncols = ncols;
864 :
865 192 : return ncols == 0? start : row_pos;
866 : }
867 :
868 : /**
869 : * Send BCP metadata to server.
870 : * Only for TDS 7.0+.
871 : * \tds
872 : * \param bcpinfo BCP information
873 : * \return TDS_SUCCESS or TDS_FAIL.
874 : */
875 : static TDSRET
876 288 : tds7_bcp_send_colmetadata(TDSSOCKET *tds, TDSBCPINFO *bcpinfo)
877 : {
878 : TDSCOLUMN *bcpcol;
879 : int i, num_cols;
880 :
881 288 : tdsdump_log(TDS_DBG_FUNC, "tds7_bcp_send_colmetadata(%p, %p)\n", tds, bcpinfo);
882 288 : assert(tds && bcpinfo);
883 :
884 288 : if (tds->out_flag != TDS_BULK || tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
885 : return TDS_FAIL;
886 :
887 : /*
888 : * Deep joy! For TDS 7 we have to send a colmetadata message followed by row data
889 : */
890 288 : tds_put_byte(tds, TDS7_RESULT_TOKEN); /* 0x81 */
891 :
892 288 : num_cols = 0;
893 2688 : for (i = 0; i < bcpinfo->bindinfo->num_cols; i++) {
894 2400 : bcpcol = bcpinfo->bindinfo->columns[i];
895 2400 : if ((!bcpinfo->identity_insert_on && bcpcol->column_identity) ||
896 2400 : bcpcol->column_timestamp ||
897 : bcpcol->column_computed) {
898 0 : continue;
899 : }
900 2400 : num_cols++;
901 : }
902 :
903 288 : tds_put_smallint(tds, num_cols);
904 :
905 2688 : for (i = 0; i < bcpinfo->bindinfo->num_cols; i++) {
906 : size_t converted_len;
907 : const char *converted_name;
908 :
909 2400 : bcpcol = bcpinfo->bindinfo->columns[i];
910 :
911 : /*
912 : * dont send the (meta)data for timestamp columns, or
913 : * identity columns (unless indentity_insert is enabled
914 : */
915 :
916 2400 : if ((!bcpinfo->identity_insert_on && bcpcol->column_identity) ||
917 2400 : bcpcol->column_timestamp ||
918 : bcpcol->column_computed) {
919 0 : continue;
920 : }
921 :
922 2400 : if (IS_TDS72_PLUS(tds->conn))
923 1232 : tds_put_int(tds, bcpcol->column_usertype);
924 : else
925 1168 : tds_put_smallint(tds, bcpcol->column_usertype);
926 2400 : tds_put_smallint(tds, bcpcol->column_flags);
927 2400 : TDS_PUT_BYTE(tds, bcpcol->on_server.column_type);
928 :
929 2400 : assert(bcpcol->funcs);
930 2400 : bcpcol->funcs->put_info(tds, bcpcol);
931 :
932 : /* TODO put this in put_info. It seems that parameter format is
933 : * different from BCP format
934 : */
935 2400 : if (is_blob_type(bcpcol->on_server.column_type)) {
936 132 : converted_name = tds_convert_string(tds, tds->conn->char_convs[client2ucs2],
937 44 : tds_dstr_cstr(&bcpinfo->tablename),
938 88 : (int) tds_dstr_len(&bcpinfo->tablename), &converted_len);
939 44 : if (!converted_name) {
940 0 : tds_connection_close(tds->conn);
941 0 : return TDS_FAIL;
942 : }
943 :
944 : /* UTF-16 length is always size / 2 even for 4 byte letters (yes, 1 letter of length 2) */
945 44 : TDS_PUT_SMALLINT(tds, converted_len / 2);
946 44 : tds_put_n(tds, converted_name, converted_len);
947 :
948 88 : tds_convert_string_free(tds_dstr_cstr(&bcpinfo->tablename), converted_name);
949 : }
950 :
951 7200 : converted_name = tds_convert_string(tds, tds->conn->char_convs[client2ucs2],
952 2400 : tds_dstr_cstr(&bcpcol->column_name),
953 4800 : (int) tds_dstr_len(&bcpcol->column_name), &converted_len);
954 2400 : if (!converted_name) {
955 0 : tds_connection_close(tds->conn);
956 0 : return TDS_FAIL;
957 : }
958 :
959 : /* UTF-16 length is always size / 2 even for 4 byte letters (yes, 1 letter of length 2) */
960 2400 : TDS_PUT_BYTE(tds, converted_len / 2);
961 2400 : tds_put_n(tds, converted_name, converted_len);
962 :
963 4800 : tds_convert_string_free(tds_dstr_cstr(&bcpcol->column_name), converted_name);
964 : }
965 :
966 288 : tds_set_state(tds, TDS_SENDING);
967 288 : return TDS_SUCCESS;
968 : }
969 :
970 : /**
971 : * Tell we finished sending BCP data to server
972 : * \tds
973 : * \param[out] rows_copied number of rows copied to server
974 : */
975 : TDSRET
976 358 : tds_bcp_done(TDSSOCKET *tds, int *rows_copied)
977 : {
978 358 : tdsdump_log(TDS_DBG_FUNC, "tds_bcp_done(%p, %p)\n", tds, rows_copied);
979 :
980 358 : if (tds->out_flag != TDS_BULK || tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
981 : return TDS_FAIL;
982 :
983 348 : tds_flush_packet(tds);
984 :
985 348 : tds_set_state(tds, TDS_PENDING);
986 :
987 348 : TDS_PROPAGATE(tds_process_simple_query(tds));
988 :
989 340 : if (rows_copied)
990 340 : *rows_copied = tds->rows_affected;
991 :
992 : return TDS_SUCCESS;
993 : }
994 :
995 : /**
996 : * Start sending BCP data to server.
997 : * Initialize stream to accept data.
998 : * \tds
999 : * \param bcpinfo BCP information already prepared
1000 : */
1001 : TDSRET
1002 348 : tds_bcp_start(TDSSOCKET *tds, TDSBCPINFO *bcpinfo)
1003 : {
1004 : TDSRET rc;
1005 :
1006 348 : tdsdump_log(TDS_DBG_FUNC, "tds_bcp_start(%p, %p)\n", tds, bcpinfo);
1007 :
1008 348 : if (!IS_TDS50_PLUS(tds->conn))
1009 : return TDS_FAIL;
1010 :
1011 348 : TDS_PROPAGATE(tds_submit_query(tds, bcpinfo->insert_stmt));
1012 :
1013 : /* set we want to switch to bulk state */
1014 348 : tds->bulk_query = true;
1015 :
1016 : /*
1017 : * In TDS 5 we get the column information as a result set from the "insert bulk" command.
1018 : */
1019 348 : if (IS_TDS50(tds->conn))
1020 60 : rc = tds5_process_insert_bulk_reply(tds, bcpinfo);
1021 : else
1022 288 : rc = tds_process_simple_query(tds);
1023 348 : TDS_PROPAGATE(rc);
1024 :
1025 348 : tds->out_flag = TDS_BULK;
1026 348 : if (tds_set_state(tds, TDS_SENDING) != TDS_SENDING)
1027 : return TDS_FAIL;
1028 :
1029 348 : if (IS_TDS7_PLUS(tds->conn))
1030 288 : tds7_bcp_send_colmetadata(tds, bcpinfo);
1031 :
1032 : return TDS_SUCCESS;
1033 : }
1034 :
1035 : enum {
1036 : /* list of columns we need, 0-nnn */
1037 : BULKCOL_colcnt,
1038 : BULKCOL_colid,
1039 : BULKCOL_type,
1040 : BULKCOL_length,
1041 : BULKCOL_status,
1042 : BULKCOL_offset,
1043 :
1044 : /* number of columns needed */
1045 : BULKCOL_COUNT,
1046 :
1047 : /* bitmask to have them all */
1048 : BULKCOL_ALL = (1 << BULKCOL_COUNT) -1,
1049 : };
1050 :
1051 : static int
1052 720 : tds5_bulk_insert_column(const char *name)
1053 : {
1054 : #define BULKCOL(n) do {\
1055 : if (strcmp(name, #n) == 0) \
1056 : return BULKCOL_ ## n; \
1057 : } while(0)
1058 :
1059 720 : switch (name[0]) {
1060 120 : case 'c':
1061 120 : BULKCOL(colcnt);
1062 60 : BULKCOL(colid);
1063 : break;
1064 60 : case 't':
1065 60 : BULKCOL(type);
1066 : break;
1067 60 : case 'l':
1068 60 : BULKCOL(length);
1069 : break;
1070 120 : case 's':
1071 120 : BULKCOL(status);
1072 : break;
1073 60 : case 'o':
1074 60 : BULKCOL(offset);
1075 : break;
1076 : }
1077 : #undef BULKCOL
1078 360 : return -1;
1079 : }
1080 :
1081 : static TDSRET
1082 60 : tds5_process_insert_bulk_reply(TDSSOCKET * tds, TDSBCPINFO *bcpinfo)
1083 : {
1084 : TDS_INT res_type;
1085 : TDS_INT done_flags;
1086 : TDSRET rc;
1087 60 : TDSRET ret = TDS_SUCCESS;
1088 60 : bool row_match = false;
1089 : TDSRESULTINFO *res_info;
1090 : int icol;
1091 : unsigned col_flags;
1092 : /* position of the columns in the row */
1093 : int cols_pos[BULKCOL_COUNT];
1094 : int cols_values[BULKCOL_COUNT];
1095 : TDS5COLINFO *colinfo;
1096 :
1097 60 : CHECK_TDS_EXTRA(tds);
1098 :
1099 774 : while ((rc = tds_process_tokens(tds, &res_type, &done_flags, TDS_RETURN_DONE|TDS_RETURN_ROWFMT|TDS_RETURN_ROW)) == TDS_SUCCESS) {
1100 714 : switch (res_type) {
1101 60 : case TDS_ROWFMT_RESULT:
1102 : /* check if it's the resultset with column information and save column positions */
1103 60 : row_match = false;
1104 60 : col_flags = 0;
1105 60 : res_info = tds->current_results;
1106 60 : if (!res_info)
1107 0 : continue;
1108 720 : for (icol = 0; icol < res_info->num_cols; ++icol) {
1109 720 : const TDSCOLUMN *col = res_info->columns[icol];
1110 1440 : int scol = tds5_bulk_insert_column(tds_dstr_cstr(&col->column_name));
1111 720 : if (scol < 0)
1112 360 : continue;
1113 360 : cols_pos[scol] = icol;
1114 360 : col_flags |= 1 << scol;
1115 : }
1116 60 : if (col_flags == BULKCOL_ALL)
1117 60 : row_match = true;
1118 : break;
1119 594 : case TDS_ROW_RESULT:
1120 : /* get the results */
1121 594 : col_flags = 0;
1122 594 : if (!row_match)
1123 0 : continue;
1124 594 : res_info = tds->current_results;
1125 594 : if (!res_info)
1126 0 : continue;
1127 3564 : for (icol = 0; icol < BULKCOL_COUNT; ++icol) {
1128 3564 : const TDSCOLUMN *col = res_info->columns[cols_pos[icol]];
1129 3564 : int ctype = tds_get_conversion_type(col->on_server.column_type, col->column_size);
1130 3564 : unsigned char *src = col->column_data;
1131 3564 : int srclen = col->column_cur_size;
1132 : CONV_RESULT dres;
1133 :
1134 3564 : if (tds_convert(tds_get_ctx(tds), ctype, src, srclen, SYBINT4, &dres) < 0)
1135 : break;
1136 3564 : col_flags |= 1 << icol;
1137 3564 : cols_values[icol] = dres.i;
1138 : }
1139 : /* save information */
1140 1188 : if (col_flags != BULKCOL_ALL ||
1141 1188 : cols_values[BULKCOL_colcnt] < 1 ||
1142 594 : cols_values[BULKCOL_colcnt] > 4096 || /* limit of columns accepted */
1143 1188 : cols_values[BULKCOL_colid] < 1 ||
1144 : cols_values[BULKCOL_colid] > cols_values[BULKCOL_colcnt]) {
1145 : rc = TDS_FAIL;
1146 : break;
1147 : }
1148 594 : if (bcpinfo->sybase_colinfo == NULL) {
1149 54 : bcpinfo->sybase_colinfo = calloc(cols_values[BULKCOL_colcnt], sizeof(*bcpinfo->sybase_colinfo));
1150 54 : if (bcpinfo->sybase_colinfo == NULL) {
1151 : rc = TDS_FAIL;
1152 : break;
1153 : }
1154 54 : bcpinfo->sybase_count = cols_values[BULKCOL_colcnt];
1155 : }
1156 : /* bound check, colcnt could have changed from row to row */
1157 594 : if (cols_values[BULKCOL_colid] > bcpinfo->sybase_count) {
1158 : rc = TDS_FAIL;
1159 : break;
1160 : }
1161 594 : colinfo = &bcpinfo->sybase_colinfo[cols_values[BULKCOL_colid] - 1];
1162 594 : colinfo->type = cols_values[BULKCOL_type];
1163 594 : colinfo->status = cols_values[BULKCOL_status];
1164 594 : colinfo->offset = cols_values[BULKCOL_offset];
1165 594 : colinfo->length = cols_values[BULKCOL_length];
1166 594 : tdsdump_log(TDS_DBG_INFO1, "gotten row information %d type %d length %d status %d offset %d\n",
1167 : cols_values[BULKCOL_colid],
1168 : colinfo->type,
1169 : colinfo->length,
1170 : colinfo->status,
1171 : colinfo->offset);
1172 : break;
1173 60 : case TDS_DONE_RESULT:
1174 : case TDS_DONEPROC_RESULT:
1175 : case TDS_DONEINPROC_RESULT:
1176 60 : if ((done_flags & TDS_DONE_ERROR) != 0)
1177 0 : ret = TDS_FAIL;
1178 : default:
1179 : break;
1180 : }
1181 : }
1182 60 : if (TDS_FAILED(rc))
1183 0 : ret = rc;
1184 :
1185 60 : return ret;
1186 : }
1187 :
1188 : /**
1189 : * Free row data allocated in the result set.
1190 : */
1191 : static void
1192 78 : tds_bcp_row_free(TDSRESULTINFO* result, unsigned char *row TDS_UNUSED)
1193 : {
1194 78 : result->row_size = 0;
1195 78 : TDS_ZERO_FREE(result->current_row);
1196 78 : }
1197 :
1198 : /**
1199 : * Start bulk copy to server
1200 : * \tds
1201 : * \param bcpinfo BCP information already prepared
1202 : */
1203 : TDSRET
1204 314 : tds_bcp_start_copy_in(TDSSOCKET *tds, TDSBCPINFO *bcpinfo)
1205 : {
1206 : TDSCOLUMN *bcpcol;
1207 : int i;
1208 314 : int fixed_col_len_tot = 0;
1209 314 : int variable_col_len_tot = 0;
1210 314 : int column_bcp_data_size = 0;
1211 314 : int bcp_record_size = 0;
1212 : TDSRET rc;
1213 : TDS_INT var_cols;
1214 :
1215 314 : tdsdump_log(TDS_DBG_FUNC, "tds_bcp_start_copy_in(%p, %p)\n", tds, bcpinfo);
1216 :
1217 314 : TDS_PROPAGATE(tds_bcp_start_insert_stmt(tds, bcpinfo));
1218 :
1219 314 : rc = tds_bcp_start(tds, bcpinfo);
1220 314 : if (TDS_FAILED(rc)) {
1221 : /* TODO, in CTLib was _ctclient_msg(blkdesc->con, "blk_rowxfer", 2, 5, 1, 140, ""); */
1222 : return rc;
1223 : }
1224 :
1225 : /*
1226 : * Work out the number of "variable" columns. These are either nullable or of
1227 : * varying length type e.g. varchar.
1228 : */
1229 314 : var_cols = 0;
1230 :
1231 314 : if (IS_TDS50(tds->conn)) {
1232 424 : for (i = 0; i < bcpinfo->bindinfo->num_cols; i++) {
1233 :
1234 424 : bcpcol = bcpinfo->bindinfo->columns[i];
1235 :
1236 : /*
1237 : * work out storage required for this datatype
1238 : * blobs always require 16, numerics vary, the
1239 : * rest can be taken from the server
1240 : */
1241 :
1242 424 : if (is_blob_type(bcpcol->on_server.column_type))
1243 : column_bcp_data_size = 16;
1244 418 : else if (is_numeric_type(bcpcol->on_server.column_type))
1245 42 : column_bcp_data_size = tds_numeric_bytes_per_prec[bcpcol->column_prec];
1246 : else
1247 376 : column_bcp_data_size = bcpcol->column_size;
1248 :
1249 : /*
1250 : * now add that size into either fixed or variable
1251 : * column totals...
1252 : */
1253 :
1254 424 : if (is_nullable_type(bcpcol->on_server.column_type) || bcpcol->column_nullable) {
1255 236 : var_cols++;
1256 236 : variable_col_len_tot += column_bcp_data_size;
1257 : } else {
1258 188 : fixed_col_len_tot += column_bcp_data_size;
1259 : }
1260 : }
1261 :
1262 : /* this formula taken from sybase manual... */
1263 :
1264 108 : bcp_record_size = 4 +
1265 54 : fixed_col_len_tot +
1266 54 : variable_col_len_tot +
1267 108 : ( (int)(variable_col_len_tot / 256 ) + 1 ) +
1268 54 : (var_cols + 1) +
1269 : 2;
1270 :
1271 54 : tdsdump_log(TDS_DBG_FUNC, "current_record_size = %d\n", bcpinfo->bindinfo->row_size);
1272 54 : tdsdump_log(TDS_DBG_FUNC, "bcp_record_size = %d\n", bcp_record_size);
1273 :
1274 54 : if (bcp_record_size > bcpinfo->bindinfo->row_size) {
1275 22 : if (!TDS_RESIZE(bcpinfo->bindinfo->current_row, bcp_record_size)) {
1276 0 : tdsdump_log(TDS_DBG_FUNC, "could not realloc current_row\n");
1277 : return TDS_FAIL;
1278 : }
1279 22 : bcpinfo->bindinfo->row_free = tds_bcp_row_free;
1280 22 : bcpinfo->bindinfo->row_size = bcp_record_size;
1281 : }
1282 : }
1283 :
1284 : return TDS_SUCCESS;
1285 : }
1286 :
1287 : /** input stream to read a file */
1288 : typedef struct tds_file_stream {
1289 : /** common fields, must be the first field */
1290 : TDSINSTREAM stream;
1291 : /** file to read from */
1292 : FILE *f;
1293 :
1294 : /** terminator */
1295 : const char *terminator;
1296 : /** terminator length in bytes */
1297 : size_t term_len;
1298 :
1299 : /** buffer for store bytes readed that could be the terminator */
1300 : char *left;
1301 : size_t left_pos;
1302 : } TDSFILESTREAM;
1303 :
1304 : /** \cond HIDDEN_SYMBOLS */
1305 : #if defined(_WIN32) && defined(HAVE__LOCK_FILE) && defined(HAVE__UNLOCK_FILE)
1306 : #define TDS_HAVE_STDIO_LOCKED 1
1307 : #define flockfile(s) _lock_file(s)
1308 : #define funlockfile(s) _unlock_file(s)
1309 : #define getc_unlocked(s) _getc_nolock(s)
1310 : #define feof_unlocked(s) _feof_nolock(s)
1311 : #endif
1312 :
1313 : #ifndef TDS_HAVE_STDIO_LOCKED
1314 : #undef getc_unlocked
1315 : #undef feof_unlocked
1316 : #undef flockfile
1317 : #undef funlockfile
1318 : #define getc_unlocked(s) getc(s)
1319 : #define feof_unlocked(s) feof(s)
1320 : #define flockfile(s) do { } while(0)
1321 : #define funlockfile(s) do { } while(0)
1322 : #endif
1323 : /** \endcond */
1324 :
1325 : /**
1326 : * Reads a chunk of data from file stream checking for terminator
1327 : * \param stream file stream
1328 : * \param ptr buffer where to read data
1329 : * \param len length of buffer
1330 : */
1331 : static int
1332 4836 : tds_file_stream_read(TDSINSTREAM *stream, void *ptr, size_t len)
1333 : {
1334 4836 : TDSFILESTREAM *s = (TDSFILESTREAM *) stream;
1335 : int c;
1336 4836 : char *p = (char *) ptr;
1337 :
1338 1893720 : while (len) {
1339 1888434 : if (memcmp(s->left, s->terminator - s->left_pos, s->term_len) == 0)
1340 4386 : return p - (char *) ptr;
1341 :
1342 3768096 : c = getc_unlocked(s->f);
1343 1884048 : if (c == EOF)
1344 : return -1;
1345 :
1346 1884048 : *p++ = s->left[s->left_pos];
1347 1884048 : --len;
1348 :
1349 1884048 : s->left[s->left_pos++] = c;
1350 1884048 : s->left_pos %= s->term_len;
1351 : }
1352 450 : return p - (char *) ptr;
1353 : }
1354 :
1355 : /**
1356 : * Read a data file, passing the data through iconv().
1357 : * \retval TDS_SUCCESS success
1358 : * \retval TDS_FAIL error reading the column
1359 : * \retval TDS_NO_MORE_RESULTS end of file detected
1360 : */
1361 : TDSRET
1362 2398 : tds_bcp_fread(TDSSOCKET * tds, TDSICONV * char_conv, FILE * stream, const char *terminator, size_t term_len, char **outbuf, size_t * outbytes)
1363 : {
1364 : TDSRET res;
1365 : TDSFILESTREAM r;
1366 : TDSDYNAMICSTREAM w;
1367 : size_t readed;
1368 :
1369 : /* prepare streams */
1370 2398 : r.stream.read = tds_file_stream_read;
1371 2398 : r.f = stream;
1372 2398 : r.term_len = term_len;
1373 2398 : r.left = tds_new0(char, term_len*3);
1374 2398 : r.left_pos = 0;
1375 2398 : if (!r.left) return TDS_FAIL;
1376 :
1377 : /* copy terminator twice, let terminator points to second copy */
1378 2398 : memcpy(r.left + term_len, terminator, term_len);
1379 2398 : memcpy(r.left + term_len*2u, terminator, term_len);
1380 2398 : r.terminator = r.left + term_len*2u;
1381 :
1382 : /* read initial buffer to test with terminator */
1383 2398 : readed = fread(r.left, 1, term_len, stream);
1384 2398 : if (readed != term_len) {
1385 130 : free(r.left);
1386 130 : if (readed == 0 && feof(stream))
1387 : return TDS_NO_MORE_RESULTS;
1388 : return TDS_FAIL;
1389 : }
1390 :
1391 2268 : res = tds_dynamic_stream_init(&w, (void**) outbuf, 0);
1392 2268 : if (TDS_FAILED(res)) {
1393 0 : free(r.left);
1394 0 : return res;
1395 : }
1396 :
1397 : /* convert/copy from input stream to output one */
1398 2268 : flockfile(stream);
1399 2268 : if (char_conv == NULL)
1400 868 : res = tds_copy_stream(&r.stream, &w.stream);
1401 : else
1402 1400 : res = tds_convert_stream(tds, char_conv, to_server, &r.stream, &w.stream);
1403 2268 : funlockfile(stream);
1404 2268 : free(r.left);
1405 :
1406 2268 : TDS_PROPAGATE(res);
1407 :
1408 2268 : *outbytes = w.size;
1409 :
1410 : /* terminate buffer */
1411 2268 : if (!w.stream.buf_len)
1412 : return TDS_FAIL;
1413 :
1414 2268 : ((char *) w.stream.buffer)[0] = 0;
1415 2268 : w.stream.write(&w.stream, 1);
1416 :
1417 2268 : return res;
1418 : }
1419 :
1420 : /**
1421 : * Start writing writetext request.
1422 : * This request start a bulk session.
1423 : * \tds
1424 : * \param objname table name
1425 : * \param textptr TEXTPTR (see sql documentation)
1426 : * \param timestamp data timestamp
1427 : * \param with_log is log is enabled during insert
1428 : * \param size bytes to be inserted
1429 : */
1430 : TDSRET
1431 42 : tds_writetext_start(TDSSOCKET *tds, const char *objname, const char *textptr, const char *timestamp, int with_log, TDS_UINT size)
1432 : {
1433 : TDSRET rc;
1434 :
1435 : /* TODO mssql does not like timestamp */
1436 42 : rc = tds_submit_queryf(tds,
1437 : "writetext bulk %s 0x%s timestamp = 0x%s%s",
1438 : objname, textptr, timestamp, with_log ? " with log" : "");
1439 42 : TDS_PROPAGATE(rc);
1440 :
1441 : /* set we want to switch to bulk state */
1442 42 : tds->bulk_query = true;
1443 :
1444 : /* read the end token */
1445 42 : TDS_PROPAGATE(tds_process_simple_query(tds));
1446 :
1447 42 : tds->out_flag = TDS_BULK;
1448 42 : if (tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
1449 : return TDS_FAIL;
1450 :
1451 42 : tds_put_int(tds, size);
1452 :
1453 42 : tds_set_state(tds, TDS_SENDING);
1454 42 : return TDS_SUCCESS;
1455 : }
1456 :
1457 : /**
1458 : * Send some data in the writetext request started by tds_writetext_start.
1459 : * You should write in total (with multiple calls to this function) all
1460 : * bytes declared calling tds_writetext_start.
1461 : * \tds
1462 : * \param text data to write
1463 : * \param size data size in bytes
1464 : */
1465 : TDSRET
1466 312 : tds_writetext_continue(TDSSOCKET *tds, const TDS_UCHAR *text, TDS_UINT size)
1467 : {
1468 312 : if (tds->out_flag != TDS_BULK || tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
1469 : return TDS_FAIL;
1470 :
1471 : /* TODO check size left */
1472 312 : tds_put_n(tds, text, size);
1473 :
1474 312 : tds_set_state(tds, TDS_SENDING);
1475 312 : return TDS_SUCCESS;
1476 : }
1477 :
1478 : /**
1479 : * Finish sending writetext data.
1480 : * \tds
1481 : */
1482 : TDSRET
1483 42 : tds_writetext_end(TDSSOCKET *tds)
1484 : {
1485 42 : if (tds->out_flag != TDS_BULK || tds_set_state(tds, TDS_WRITING) != TDS_WRITING)
1486 : return TDS_FAIL;
1487 :
1488 42 : tds_flush_packet(tds);
1489 42 : tds_set_state(tds, TDS_PENDING);
1490 42 : return TDS_SUCCESS;
1491 : }
|