Newer
Older
/*
* Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include "execute.h"
#include "sql/sqlInt.h"
#include "sql/sqlLimit.h"
#include "errcode.h"
#include "small/region.h"
#include "small/obuf.h"
#include "diag.h"
#include "sql.h"
#include "xrow.h"
#include "schema.h"
const char *sql_info_key_strs[] = {
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
static_assert(sizeof(struct port_sql) <= sizeof(struct port),
"sizeof(struct port_sql) must be <= sizeof(struct port)");
/**
* Dump data from port to buffer. Data in port contains tuples,
* metadata, or information obtained from an executed SQL query.
*
* Dumped msgpack structure:
* +----------------------------------------------+
* | IPROTO_BODY: { |
* | IPROTO_METADATA: [ |
* | {IPROTO_FIELD_NAME: column name1}, |
* | {IPROTO_FIELD_NAME: column name2}, |
* | ... |
* | ], |
* | |
* | IPROTO_DATA: [ |
* | tuple, tuple, tuple, ... |
* | ] |
* | } |
* +-------------------- OR ----------------------+
* | IPROTO_BODY: { |
* | IPROTO_SQL_INFO: { |
* | SQL_INFO_ROW_COUNT: number |
* | SQL_INFO_AUTOINCREMENT_IDS: [ |
* | id, id, id, ... |
* | ] |
* | } |
* | } |
* +-------------------- OR ----------------------+
* | IPROTO_BODY: { |
* | IPROTO_SQL_INFO: { |
* | SQL_INFO_ROW_COUNT: number |
* | } |
* | } |
* +----------------------------------------------+
* @param port Port that contains SQL response.
* @param[out] out Output buffer.
*
* @retval 0 Success.
* @retval -1 Memory error.
*/
static int
port_sql_dump_msgpack(struct port *port, struct obuf *out);
static void
port_sql_destroy(struct port *base)
{
port_tuple_vtab.destroy(base);
sql_stmt_finalize(((struct port_sql *)base)->stmt);
}
const struct port_vtab port_sql_vtab = {
/* .dump_msgpack = */ port_sql_dump_msgpack,
/* .dump_msgpack_16 = */ NULL,
/* .get_msgpack = */ NULL,
/* .get_vdbemem = */ NULL,
/* .destroy = */ port_sql_destroy,
};
static void
port_sql_create(struct port *port, struct sql_stmt *stmt)
{
port_tuple_create(port);
((struct port_sql *)port)->stmt = stmt;
port->vtab = &port_sql_vtab;
}
/**
* Serialize a single column of a result set row.
* @param stmt Prepared and started statement. At least one
* @param i Column number.
* @param region Allocator for column value.
* @retval 0 Success.
* @retval -1 Out of memory when resizing the output buffer.
*/
static inline int
sql_column_to_messagepack(struct sql_stmt *stmt, int i,
struct region *region)
enum mp_type type = sql_column_type(stmt, i);
int64_t n = sql_column_int64(stmt, i);
size = mp_sizeof_int(n);
char *pos = (char *) region_alloc(region, size);
mp_encode_int(pos, n);
break;
}
case MP_UINT: {
uint64_t n = sql_column_uint64(stmt, i);
size = mp_sizeof_uint(n);
char *pos = (char *) region_alloc(region, size);
if (pos == NULL)
goto oom;
mp_encode_uint(pos, n);
double d = sql_column_double(stmt, i);
char *pos = (char *) region_alloc(region, size);
if (pos == NULL)
goto oom;
mp_encode_double(pos, d);
break;
}
uint32_t len = sql_column_bytes(stmt, i);
char *pos = (char *) region_alloc(region, size);
if (pos == NULL)
goto oom;
const char *s;
s = (const char *)sql_column_text(stmt, i);
mp_encode_str(pos, s, len);
break;
}
uint32_t len = sql_column_bytes(stmt, i);
(const char *)sql_column_blob(stmt, i);
if (sql_column_subtype(stmt, i) == SQL_SUBTYPE_MSGPACK) {
size = len;
char *pos = (char *)region_alloc(region, size);
if (pos == NULL)
goto oom;
memcpy(pos, s, len);
} else {
size = mp_sizeof_bin(len);
char *pos = (char *)region_alloc(region, size);
if (pos == NULL)
goto oom;
mp_encode_bin(pos, s, len);
}
case MP_BOOL: {
bool b = sql_column_boolean(stmt, i);
size = mp_sizeof_bool(b);
char *pos = (char *) region_alloc(region, size);
if (pos == NULL)
goto oom;
mp_encode_bool(pos, b);
break;
}
char *pos = (char *) region_alloc(region, size);
if (pos == NULL)
goto oom;
mp_encode_nil(pos);
break;
}
default:
unreachable();
}
return 0;
oom:
diag_set(OutOfMemory, size, "region_alloc", "SQL value");
* Convert sql row into a tuple and append to a port.
* @param stmt Started prepared statement. At least one
* @param column_count Statement's column count.
* @param region Runtime allocator for temporary objects.
* @param port Port to store tuples.
*
* @retval 0 Success.
* @retval -1 Memory error.
*/
static inline int
sql_row_to_port(struct sql_stmt *stmt, int column_count,
struct region *region, struct port *port)
{
assert(column_count > 0);
size_t size = mp_sizeof_array(column_count);
size_t svp = region_used(region);
char *pos = (char *) region_alloc(region, size);
diag_set(OutOfMemory, size, "region_alloc", "SQL row");
return -1;
}
mp_encode_array(pos, column_count);
for (int i = 0; i < column_count; ++i) {
if (sql_column_to_messagepack(stmt, i, region) != 0)
goto error;
}
size = region_used(region) - svp;
pos = (char *) region_join(region, size);
if (pos == NULL) {
diag_set(OutOfMemory, size, "region_join", "pos");
goto error;
struct tuple *tuple =
tuple_new(box_tuple_format_default(), pos, pos + size);
goto error;
region_truncate(region, svp);
error:
region_truncate(region, svp);
return -1;
metadata_map_sizeof(const char *name, const char *type, const char *coll,
const char *span, int nullable, bool is_autoincrement)
{
uint32_t members_count = 2;
size_t map_size = 0;
if (coll != NULL) {
members_count++;
map_size += mp_sizeof_uint(IPROTO_FIELD_COLL);
map_size += mp_sizeof_str(strlen(coll));
}
if (nullable != -1) {
members_count++;
map_size += mp_sizeof_uint(IPROTO_FIELD_IS_NULLABLE);
map_size += mp_sizeof_bool(nullable);
}
if (is_autoincrement) {
members_count++;
map_size += mp_sizeof_uint(IPROTO_FIELD_IS_AUTOINCREMENT);
map_size += mp_sizeof_bool(true);
}
if (sql_metadata_is_full()) {
members_count++;
map_size += mp_sizeof_uint(IPROTO_FIELD_SPAN);
map_size += span != NULL ? mp_sizeof_str(strlen(span)) :
mp_sizeof_nil();
}
map_size += mp_sizeof_uint(IPROTO_FIELD_NAME);
map_size += mp_sizeof_uint(IPROTO_FIELD_TYPE);
map_size += mp_sizeof_str(strlen(name));
map_size += mp_sizeof_str(strlen(type));
map_size += mp_sizeof_map(members_count);
return map_size;
}
static inline void
metadata_map_encode(char *buf, const char *name, const char *type,
const char *coll, const char *span, int nullable,
bool is_autoincrement)
uint32_t map_sz = 2 + (coll != NULL) + (nullable != -1) +
buf = mp_encode_map(buf, map_sz);
buf = mp_encode_uint(buf, IPROTO_FIELD_NAME);
buf = mp_encode_str(buf, name, strlen(name));
buf = mp_encode_uint(buf, IPROTO_FIELD_TYPE);
buf = mp_encode_str(buf, type, strlen(type));
if (coll != NULL) {
buf = mp_encode_uint(buf, IPROTO_FIELD_COLL);
buf = mp_encode_str(buf, coll, strlen(coll));
}
if (nullable != -1) {
buf = mp_encode_uint(buf, IPROTO_FIELD_IS_NULLABLE);
buf = mp_encode_bool(buf, nullable);
}
if (is_autoincrement) {
buf = mp_encode_uint(buf, IPROTO_FIELD_IS_AUTOINCREMENT);
buf = mp_encode_bool(buf, true);
}
if (! is_full)
return;
/*
* Span is an original expression that forms
* result set column. In most cases it is the
* same as column name. So to avoid sending
* the same string twice simply encode it as
* a nil and account this behaviour on client
* side (see decode_metadata_optional()).
*/
buf = mp_encode_uint(buf, IPROTO_FIELD_SPAN);
if (span != NULL)
buf = mp_encode_str(buf, span, strlen(span));
else
buf = mp_encode_nil(buf);
* Serialize a description of the prepared statement.
* @param stmt Prepared statement.
* @param out Out buffer.
* @param column_count Statement's column count.
*
* @retval 0 Success.
* @retval -1 Client or memory error.
*/
static inline int
sql_get_metadata(struct sql_stmt *stmt, struct obuf *out, int column_count)
assert(column_count > 0);
int size = mp_sizeof_uint(IPROTO_METADATA) +
mp_sizeof_array(column_count);
char *pos = (char *) obuf_alloc(out, size);
if (pos == NULL) {
diag_set(OutOfMemory, size, "obuf_alloc", "pos");
}
pos = mp_encode_uint(pos, IPROTO_METADATA);
pos = mp_encode_array(pos, column_count);
for (int i = 0; i < column_count; ++i) {
const char *coll = sql_column_coll(stmt, i);
const char *name = sql_column_name(stmt, i);
const char *type = sql_column_datatype(stmt, i);
const char *span = sql_column_span(stmt, i);
int nullable = sql_column_nullable(stmt, i);
bool is_autoincrement = sql_column_is_autoincrement(stmt, i);
* Can not fail, since all column names and types
* are preallocated during prepare phase and the
* column_name simply returns them.
*/
assert(name != NULL);
size = metadata_map_sizeof(name, type, coll, span, nullable,
char *pos = (char *) obuf_alloc(out, size);
if (pos == NULL) {
diag_set(OutOfMemory, size, "obuf_alloc", "pos");
return -1;
}
metadata_map_encode(pos, name, type, coll, span, nullable,
static int
port_sql_dump_msgpack(struct port *port, struct obuf *out)
struct sql_stmt *stmt = ((struct port_sql *)port)->stmt;
int column_count = sql_column_count(stmt);
if (column_count > 0) {
int keys = 2;
int size = mp_sizeof_map(keys);
char *pos = (char *) obuf_alloc(out, size);
if (pos == NULL) {
diag_set(OutOfMemory, size, "obuf_alloc", "pos");
}
pos = mp_encode_map(pos, keys);
if (sql_get_metadata(stmt, out, column_count) != 0)
return -1;
size = mp_sizeof_uint(IPROTO_DATA);
pos = (char *) obuf_alloc(out, size);
if (pos == NULL) {
diag_set(OutOfMemory, size, "obuf_alloc", "pos");
}
pos = mp_encode_uint(pos, IPROTO_DATA);
if (port_tuple_vtab.dump_msgpack(port, out) < 0)
return -1;
struct stailq *autoinc_id_list =
vdbe_autoinc_id_list((struct Vdbe *)stmt);
uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
int size = mp_sizeof_map(keys) +
mp_sizeof_uint(IPROTO_SQL_INFO) +
mp_sizeof_map(map_size);
char *pos = (char *) obuf_alloc(out, size);
if (pos == NULL) {
diag_set(OutOfMemory, size, "obuf_alloc", "pos");
pos = mp_encode_map(pos, keys);
pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
pos = mp_encode_map(pos, map_size);
size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
mp_sizeof_uint(changes);
if (!stailq_empty(autoinc_id_list)) {
struct autoinc_id_entry *id_entry;
stailq_foreach_entry(id_entry, autoinc_id_list, link) {
size += id_entry->id >= 0 ?
mp_sizeof_uint(id_entry->id) :
mp_sizeof_int(id_entry->id);
id_count++;
}
size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
mp_sizeof_array(id_count);
}
char *buf = obuf_alloc(out, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "obuf_alloc", "buf");
buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
buf = mp_encode_uint(buf, changes);
if (!stailq_empty(autoinc_id_list)) {
buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
buf = mp_encode_array(buf, id_count);
struct autoinc_id_entry *id_entry;
stailq_foreach_entry(id_entry, autoinc_id_list, link) {
buf = id_entry->id >= 0 ?
mp_encode_uint(buf, id_entry->id) :
mp_encode_int(buf, id_entry->id);
}
}
return 0;
}
/**
* Execute prepared SQL statement.
*
* This function uses region to allocate memory for temporary
* objects. After this function, region will be in the same state
* in which it was before this function.
*
* @param db SQL handle.
* @param stmt Prepared statement.
* @param port Port to store SQL response.
* @param region Region to allocate temporary objects.
*
* @retval 0 Success.
* @retval -1 Error.
*/
static inline int
sql_execute(struct sql_stmt *stmt, struct port *port, struct region *region)
{
int rc, column_count = sql_column_count(stmt);
if (column_count > 0) {
/* Either ROW or DONE or ERROR. */
while ((rc = sql_step(stmt)) == SQL_ROW) {
if (sql_row_to_port(stmt, column_count, region,
port) != 0)
return -1;
}
} else {
/* No rows. Either DONE or ERROR. */
rc = sql_step(stmt);
return -1;
return 0;
}
int
sql_prepare_and_execute(const char *sql, int len, const struct sql_bind *bind,
uint32_t bind_count, struct port *port,
struct region *region)
{
struct sql_stmt *stmt;
if (sql_stmt_compile(sql, len, NULL, &stmt, NULL) != 0)
return -1;
assert(stmt != NULL);
port_sql_create(port, stmt);
if (sql_bind(stmt, bind, bind_count) == 0 &&
sql_execute(stmt, port, region) == 0)