Skip to content
Snippets Groups Projects
execute.c 13.5 KiB
Newer Older
/*
 * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * 1. Redistributions of source code must retain the above
 *    copyright notice, this list of conditions and the
 *    following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above
 *    copyright notice, this list of conditions and the following
 *    disclaimer in the documentation and/or other materials
 *    provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 */
#include "execute.h"

#include "bind.h"
#include "iproto_constants.h"
#include "sql/sqlInt.h"
#include "sql/sqlLimit.h"
#include "errcode.h"
#include "small/region.h"
#include "small/obuf.h"
#include "diag.h"
#include "sql.h"
#include "xrow.h"
#include "schema.h"
#include "tuple.h"
#include "sql/vdbe.h"
Mergen Imeev's avatar
Mergen Imeev committed
#include "box/lua/execute.h"
const char *sql_info_key_strs[] = {
Mergen Imeev's avatar
Mergen Imeev committed
	"row_count",
	"autoincrement_ids",
Mergen Imeev's avatar
Mergen Imeev committed
static_assert(sizeof(struct port_sql) <= sizeof(struct port),
	      "sizeof(struct port_sql) must be <= sizeof(struct port)");

/**
 * Dump data from port to buffer. Data in port contains tuples,
 * metadata, or information obtained from an executed SQL query.
 *
 * Dumped msgpack structure:
 * +----------------------------------------------+
 * | IPROTO_BODY: {                               |
 * |     IPROTO_METADATA: [                       |
 * |         {IPROTO_FIELD_NAME: column name1},   |
 * |         {IPROTO_FIELD_NAME: column name2},   |
 * |         ...                                  |
 * |     ],                                       |
 * |                                              |
 * |     IPROTO_DATA: [                           |
 * |         tuple, tuple, tuple, ...             |
 * |     ]                                        |
 * | }                                            |
 * +-------------------- OR ----------------------+
 * | IPROTO_BODY: {                               |
 * |     IPROTO_SQL_INFO: {                       |
 * |         SQL_INFO_ROW_COUNT: number           |
 * |         SQL_INFO_AUTOINCREMENT_IDS: [        |
 * |             id, id, id, ...                  |
 * |         ]                                    |
 * |     }                                        |
 * | }                                            |
 * +-------------------- OR ----------------------+
 * | IPROTO_BODY: {                               |
 * |     IPROTO_SQL_INFO: {                       |
 * |         SQL_INFO_ROW_COUNT: number           |
 * |     }                                        |
 * | }                                            |
 * +----------------------------------------------+
 * @param port Port that contains SQL response.
 * @param[out] out Output buffer.
 *
 * @retval  0 Success.
 * @retval -1 Memory error.
 */
static int
port_sql_dump_msgpack(struct port *port, struct obuf *out);

static void
port_sql_destroy(struct port *base)
{
	port_tuple_vtab.destroy(base);
	sql_finalize(((struct port_sql *)base)->stmt);
}

const struct port_vtab port_sql_vtab = {
	/* .dump_msgpack = */ port_sql_dump_msgpack,
	/* .dump_msgpack_16 = */ NULL,
Mergen Imeev's avatar
Mergen Imeev committed
	/* .dump_lua = */ port_sql_dump_lua,
Mergen Imeev's avatar
Mergen Imeev committed
	/* .dump_plain = */ NULL,
	/* .destroy = */ port_sql_destroy,
};

static void
port_sql_create(struct port *port, struct sql_stmt *stmt)
{
	port_tuple_create(port);
	((struct port_sql *)port)->stmt = stmt;
	port->vtab = &port_sql_vtab;
}

/**
 * Serialize a single column of a result set row.
 * @param stmt Prepared and started statement. At least one
 *        sql_step must be called.
 * @param region Allocator for column value.
 * @retval  0 Success.
 * @retval -1 Out of memory when resizing the output buffer.
 */
static inline int
sql_column_to_messagepack(struct sql_stmt *stmt, int i,
	enum mp_type type = sql_column_type(stmt, i);
	switch (type) {
		int64_t n = sql_column_int64(stmt, i);
		if (n >= 0)
			size = mp_sizeof_uint(n);
		else
			size = mp_sizeof_int(n);
		char *pos = (char *) region_alloc(region, size);
		if (pos == NULL)
			goto oom;
		if (n >= 0)
			mp_encode_uint(pos, n);
		else
			mp_encode_int(pos, n);
		break;
	}
		double d = sql_column_double(stmt, i);
		size = mp_sizeof_double(d);
		char *pos = (char *) region_alloc(region, size);
		if (pos == NULL)
			goto oom;
		mp_encode_double(pos, d);
		break;
	}
		uint32_t len = sql_column_bytes(stmt, i);
		size = mp_sizeof_str(len);
		char *pos = (char *) region_alloc(region, size);
		if (pos == NULL)
			goto oom;
		const char *s;
		s = (const char *)sql_column_text(stmt, i);
		mp_encode_str(pos, s, len);
		break;
	}
		uint32_t len = sql_column_bytes(stmt, i);
			(const char *)sql_column_blob(stmt, i);
		if (sql_column_subtype(stmt, i) == SQL_SUBTYPE_MSGPACK) {
			size = len;
			char *pos = (char *)region_alloc(region, size);
			if (pos == NULL)
				goto oom;
			memcpy(pos, s, len);
		} else {
			size = mp_sizeof_bin(len);
			char *pos = (char *)region_alloc(region, size);
			if (pos == NULL)
				goto oom;
			mp_encode_bin(pos, s, len);
		}
	case MP_BOOL: {
		bool b = sql_column_boolean(stmt, i);
		size = mp_sizeof_bool(b);
		char *pos = (char *) region_alloc(region, size);
		if (pos == NULL)
			goto oom;
		mp_encode_bool(pos, b);
		break;
	}
		size = mp_sizeof_nil();
		char *pos = (char *) region_alloc(region, size);
		if (pos == NULL)
			goto oom;
		mp_encode_nil(pos);
		break;
	}
	default:
		unreachable();
	}
	return 0;
oom:
	diag_set(OutOfMemory, size, "region_alloc", "SQL value");
 * Convert sql row into a tuple and append to a port.
 * @param stmt Started prepared statement. At least one
 *        sql_step must be done.
 * @param column_count Statement's column count.
 * @param region Runtime allocator for temporary objects.
 * @param port Port to store tuples.
 *
 * @retval  0 Success.
 * @retval -1 Memory error.
 */
static inline int
sql_row_to_port(struct sql_stmt *stmt, int column_count,
		struct region *region, struct port *port)
{
	assert(column_count > 0);
	size_t size = mp_sizeof_array(column_count);
	size_t svp = region_used(region);
	char *pos = (char *) region_alloc(region, size);
	if (pos == NULL) {
		diag_set(OutOfMemory, size, "region_alloc", "SQL row");
		return -1;
	}
	mp_encode_array(pos, column_count);

	for (int i = 0; i < column_count; ++i) {
		if (sql_column_to_messagepack(stmt, i, region) != 0)
			goto error;
	}
	size = region_used(region) - svp;
	pos = (char *) region_join(region, size);
	if (pos == NULL) {
		diag_set(OutOfMemory, size, "region_join", "pos");
		goto error;
		tuple_new(box_tuple_format_default(), pos, pos + size);
		goto error;
	region_truncate(region, svp);
	return port_tuple_add(port, tuple);

error:
	region_truncate(region, svp);
	return -1;
 * Serialize a description of the prepared statement.
 * @param stmt Prepared statement.
 * @param out Out buffer.
 * @param column_count Statement's column count.
 *
 * @retval  0 Success.
 * @retval -1 Client or memory error.
 */
static inline int
Mergen Imeev's avatar
Mergen Imeev committed
sql_get_metadata(struct sql_stmt *stmt, struct obuf *out, int column_count)
	int size = mp_sizeof_uint(IPROTO_METADATA) +
		   mp_sizeof_array(column_count);
	char *pos = (char *) obuf_alloc(out, size);
	if (pos == NULL) {
		diag_set(OutOfMemory, size, "obuf_alloc", "pos");
	}
	pos = mp_encode_uint(pos, IPROTO_METADATA);
	pos = mp_encode_array(pos, column_count);
	for (int i = 0; i < column_count; ++i) {
		size_t size = mp_sizeof_map(2) +
			      mp_sizeof_uint(IPROTO_FIELD_NAME) +
			      mp_sizeof_uint(IPROTO_FIELD_TYPE);
		const char *name = sql_column_name(stmt, i);
		const char *type = sql_column_datatype(stmt, i);
		 * Can not fail, since all column names and types
		 * are preallocated during prepare phase and the
		 * column_name simply returns them.
		 */
		assert(name != NULL);
		assert(type != NULL);
		size += mp_sizeof_str(strlen(name));
		size += mp_sizeof_str(strlen(type));
		char *pos = (char *) obuf_alloc(out, size);
		if (pos == NULL) {
			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
			return -1;
		}
		pos = mp_encode_map(pos, 2);
		pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
		pos = mp_encode_str(pos, name, strlen(name));
		pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
		pos = mp_encode_str(pos, type, strlen(type));
Mergen Imeev's avatar
Mergen Imeev committed
static int
port_sql_dump_msgpack(struct port *port, struct obuf *out)
Mergen Imeev's avatar
Mergen Imeev committed
	assert(port->vtab == &port_sql_vtab);
	sql *db = sql_get();
Mergen Imeev's avatar
Mergen Imeev committed
	struct sql_stmt *stmt = ((struct port_sql *)port)->stmt;
	int column_count = sql_column_count(stmt);
		int keys = 2;
		int size = mp_sizeof_map(keys);
		char *pos = (char *) obuf_alloc(out, size);
		if (pos == NULL) {
			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
Mergen Imeev's avatar
Mergen Imeev committed
			return -1;
		}
		pos = mp_encode_map(pos, keys);
Mergen Imeev's avatar
Mergen Imeev committed
		if (sql_get_metadata(stmt, out, column_count) != 0)
Mergen Imeev's avatar
Mergen Imeev committed
			return -1;
		size = mp_sizeof_uint(IPROTO_DATA);
		pos = (char *) obuf_alloc(out, size);
		if (pos == NULL) {
			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
Mergen Imeev's avatar
Mergen Imeev committed
			return -1;
		}
		pos = mp_encode_uint(pos, IPROTO_DATA);
Mergen Imeev's avatar
Mergen Imeev committed
		if (port_tuple_vtab.dump_msgpack(port, out) < 0)
			return -1;
Mergen Imeev's avatar
Mergen Imeev committed
		assert(((struct port_tuple *)port)->size == 0);
		struct stailq *autoinc_id_list =
			vdbe_autoinc_id_list((struct Vdbe *)stmt);
		uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
		int size = mp_sizeof_map(keys) +
			   mp_sizeof_uint(IPROTO_SQL_INFO) +
			   mp_sizeof_map(map_size);
		char *pos = (char *) obuf_alloc(out, size);
		if (pos == NULL) {
			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
Mergen Imeev's avatar
Mergen Imeev committed
			return -1;
		pos = mp_encode_map(pos, keys);
		pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
		pos = mp_encode_map(pos, map_size);
		uint64_t id_count = 0;
		int changes = db->nChange;
		size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
		       mp_sizeof_uint(changes);
		if (!stailq_empty(autoinc_id_list)) {
			struct autoinc_id_entry *id_entry;
			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
				size += id_entry->id >= 0 ?
					mp_sizeof_uint(id_entry->id) :
					mp_sizeof_int(id_entry->id);
				id_count++;
			}
			size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
				mp_sizeof_array(id_count);
		}
		char *buf = obuf_alloc(out, size);
		if (buf == NULL) {
			diag_set(OutOfMemory, size, "obuf_alloc", "buf");
Mergen Imeev's avatar
Mergen Imeev committed
			return -1;
		buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
		buf = mp_encode_uint(buf, changes);
		if (!stailq_empty(autoinc_id_list)) {
			buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
			buf = mp_encode_array(buf, id_count);
			struct autoinc_id_entry *id_entry;
			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
				buf = id_entry->id >= 0 ?
				      mp_encode_uint(buf, id_entry->id) :
				      mp_encode_int(buf, id_entry->id);
			}
		}
Mergen Imeev's avatar
Mergen Imeev committed
	return 0;
}

/**
 * Execute prepared SQL statement.
 *
 * This function uses region to allocate memory for temporary
 * objects. After this function, region will be in the same state
 * in which it was before this function.
 *
 * @param db SQL handle.
 * @param stmt Prepared statement.
 * @param port Port to store SQL response.
 * @param region Region to allocate temporary objects.
 *
 * @retval  0 Success.
 * @retval -1 Error.
 */
static inline int
sql_execute(sql *db, struct sql_stmt *stmt, struct port *port,
	    struct region *region)
{
	int rc, column_count = sql_column_count(stmt);
	if (column_count > 0) {
		/* Either ROW or DONE or ERROR. */
		while ((rc = sql_step(stmt)) == SQL_ROW) {
			if (sql_row_to_port(stmt, column_count, region,
					    port) != 0)
				return -1;
		}
		assert(rc == SQL_DONE || rc != SQL_OK);
	} else {
		/* No rows. Either DONE or ERROR. */
		rc = sql_step(stmt);
		assert(rc != SQL_ROW && rc != SQL_OK);
	}
	if (rc != SQL_DONE) {
		if (db->errCode != SQL_TARANTOOL_ERROR) {
			const char *err = (char *)sql_value_text(db->pErr);
			if (err == NULL)
				err = sqlErrStr(db->errCode);
			diag_set(ClientError, ER_SQL_EXECUTE, err);
		}
		return -1;
	}
	return 0;
}

int
sql_prepare_and_execute(const char *sql, int len, const struct sql_bind *bind,
			uint32_t bind_count, struct port *port,
			struct region *region)
{
	struct sql_stmt *stmt;
	struct sql *db = sql_get();
	if (sql_prepare_v2(db, sql, len, &stmt, NULL) != SQL_OK) {
		if (db->errCode != SQL_TARANTOOL_ERROR) {
			const char *err = (char *)sql_value_text(db->pErr);
			if (err == NULL)
				err = sqlErrStr(db->errCode);
			diag_set(ClientError, ER_SQL_EXECUTE, err);
		}
		return -1;
	}
	assert(stmt != NULL);
	port_sql_create(port, stmt);
	if (sql_bind(stmt, bind, bind_count) == 0 &&
	    sql_execute(db, stmt, port, region) == 0)
		return 0;
	port_destroy(port);
	return -1;