diff --git a/cmake/BuildSQL.cmake b/cmake/BuildSQL.cmake index ee17721efa9b2650c32095447d2ff2e0d85ca820..980aa2ce8784e4c5c9347f74cf54ae809c9c4582 100644 --- a/cmake/BuildSQL.cmake +++ b/cmake/BuildSQL.cmake @@ -31,7 +31,18 @@ endif() if(ENABLE_MYSQL) - + include(FindMySQL) + if (MYSQL_FOUND) + message(STATUS + "box.net.sql(mysql) INC=${MYSQL_INCLUDE_DIR}") + message(STATUS + "box.net.sql(mysql) LIBS=mysqlclient_r") + add_compile_flags("C;CXX" "-I${MYSQL_INCLUDE_DIR}") + add_compile_flags("C;CXX" "-lmysqlclient_r") + else() + message(STATUS "MySQL client not found") + set(ENABLE_MYSQL OFF) + endif() endif() if (ENABLE_MYSQL) diff --git a/cmake/FindMySQL.cmake b/cmake/FindMySQL.cmake new file mode 100644 index 0000000000000000000000000000000000000000..aba75c7157126a1491e06aafb379e8b4f738d1c4 --- /dev/null +++ b/cmake/FindMySQL.cmake @@ -0,0 +1,22 @@ +find_path(MYSQL_INCLUDE_DIR + NAMES mysql.h + PATH_SUFFIXES mysql +) +find_library(MYSQL_LIBRARIES + NAMES mysqlclient_r +) + +if(MYSQL_INCLUDE_DIR AND MYSQL_LIBRARIES) + set(MYSQL_FOUND ON) +endif(MYSQL_INCLUDE_DIR AND MYSQL_LIBRARIES) + +if(MYSQL_FOUND) + if (NOT MYSQL_FIND_QUIETLY) + message(STATUS "Found MySQL includes: ${MYSQL_INCLUDE_DIR}/mysql.h") + message(STATUS "Found MySQL library: ${MYSQL_LIBRARIES}") + endif (NOT MYSQL_FIND_QUIETLY) +else(MYSQL_FOUND) + if (MYSQL_FIND_REQUIRED) + message(FATAL_ERROR "Could not find mysql development files") + endif (MYSQL_FIND_REQUIRED) +endif (MYSQL_FOUND) diff --git a/src/box/lua/sql.lua b/src/box/lua/sql.lua index f14fb435e56a3b0c60120c63acfb37f4b78d6a3f..72fab82c01062d5e1a5e932a059cabfd0772908a 100644 --- a/src/box/lua/sql.lua +++ b/src/box/lua/sql.lua @@ -1,26 +1,65 @@ box.net.sql = { - connect = function(driver, host, port, user, login, db, ...) + -- constructor + -- box.net.sql.connect( + -- 'pg', -- @driver ('pg' or 'mysql') + -- 'my.host', -- @host + -- 5432, -- @port + -- 'user', -- @username + -- 'SECRET', -- @password + -- 'DB', -- @database name + -- { raise = false }, -- @config options + -- { sql1, var, ... var }, -- @startup SQL statements + -- ... + -- ) + -- + -- @return connector to database or throw error + -- if option raise set in 'false' and an error will be happened + -- the function will return 'nil' as the first variable and + -- text of error as the second + + connect = function(driver, host, port, user, password, db, cfg, ...) if type(driver) == 'table' then driver = driver.driver end local self = { + -- connection variables driver = driver, host = host, port = port, - login = login, + user = user, password = password, - db = db + db = db, + + -- private variables + queue = {}, + processing = false, + + -- throw exception if error + raise = true } - + + -- config parameters + if type(cfg) == 'table' then + if type(cfg.raise) == 'boolean' then + self.raise = cfg.raise + end + end + local init = { ... } setmetatable(self, box.net.sql) -- do_connect is written in C -- it add 'raw' field in the table - local c = box.net.sql.do_connect( self ) + local s, c = pcall(box.net.sql.do_connect, self) + if not s then + if self.raise then + error(c) + end + return nil, c + end -- perform init statements for i, s in pairs(init) do @@ -30,26 +69,95 @@ box.net.sql = { end, + __index = { - -- main method - -- do query and returns: status, resultset + -- base method + -- example: + -- local tuples, arows, txtst = db:execute(sql, args) + -- tuples - a table of tuples (tables) + -- arows - count of affected rows + -- txtst - text status (Postgresql specific) + + -- the method throws exception by default. + -- user can change the behaviour by set 'connection.raise' + -- attribute to 'false' + -- in the case it will return negative arows if error and + -- txtst will contain text of error + execute = function(self, sql, ...) - if self.raw == nil then - error("Connection was not established") + -- waits until connection will be free + while self.processing do + self.queue[ box.fiber.fid ] = box.ipc.channel() + self.queue[ box.fiber.fid ]:get() + self.queue[ box.fiber.fid ] = nil + end + self.processing = true + + local res = { pcall(self.raw.execute, self, sql, ...) } + self.processing = false + if not res[1] then + if self.raise then + error(res[2]) + end + return {}, -1, res[2] end - error("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") - return self.raw.execute(self, sql, ...) + + -- wakeup one waiter + for fid, ch in pairs(self.queue) do + ch:put(true, 0) + self.queue[ fid ] = nil + break + end + table.remove(res, 1) + return unpack(res) end, - -- quote identifiers - ident_quote = function(self, ident) - return self.raw.ident_quote(ident) + + -- pings database + -- returns true if success. doesn't throw any errors + ping = function(self) + local pf = function() + local res = self:execute('SELECT 1 AS code') + if type(res) ~= 'table' then + return false + end + if type(res[1]) ~= 'table' then + return false + end + + return res[1].code == 1 + end + + local res, code = pcall(pf) + if res == true then + return code + else + return false + end end, - -- quote variables - quote = function(self, value) - return self.raw.quote(value) + + -- select rows + -- returns table of rows + select = function(self, sql, ...) + local res = self:execute(sql, ...) + return res + end, + + -- select one row + single = function(self, sql, ...) + local res = self:execute(sql, ...) + if #res > 1 then + error("SQL request returned multiply rows") + end + return res[1] end, + + -- perform request. returns count of affected rows + perform = function(self, sql, ...) + local res, affected, status = self:execute(sql, ...) + return affected + end } } diff --git a/src/lua/pg.m b/src/lua/pg.m index 3ff477c507da60efa59b8e05fa2b423966bfebc0..7f8ba1d06bd4e1a21853a02eebb74cc72278c167 100644 --- a/src/lua/pg.m +++ b/src/lua/pg.m @@ -1,12 +1,363 @@ +#include <postgres.h> +#include <libpq-fe.h> +#include <catalog/pg_type.h> + +#undef PACKAGE_VERSION + #include "lua_pg.h" #include "lua.h" #include "lauxlib.h" #include "lualib.h" +#include <stdio.h> +#include <stdlib.h> +#include <errno.h> +#include <string.h> +#include <coeio.h> +#include <tarantool_ev.h> + + +#include <lua/init.h> +#include <say.h> + + +#define CONSTR_MAXLEN 128 + + + + +static PGconn * +lua_check_pgconn(struct lua_State *L, int index) +{ + int pop = 0; + if (lua_istable(L, index)) { + if (index < 0) + index--; + lua_pushstring(L, "raw"); + lua_rawget(L, index); + + pop = 1; + index = -1; + } + + if (!lua_isuserdata(L, index)) + luaL_error(L, "Can't extract userdata from lua-stack"); + + PGconn *conn = *(void **)lua_touserdata(L, index); + if (pop) + lua_pop(L, pop); + return conn; +} + + +/** do execute request (is run in the other thread) */ +static ssize_t +pg_exec(va_list ap) +{ + PGconn *conn = va_arg(ap, typeof(conn)); + const char *sql = va_arg(ap, typeof(sql)); + int count = va_arg(ap, typeof(count)); + Oid *paramTypes = va_arg(ap, typeof(paramTypes)); + const char **paramValues = va_arg(ap, typeof(paramValues)); + const int *paramLengths = va_arg(ap, typeof(paramLengths)); + const int *paramFormats = va_arg(ap, typeof(paramFormats)); + PGresult **res = va_arg(ap, typeof(res)); + + *res = PQexecParams(conn, sql, + count, paramTypes, paramValues, paramLengths, paramFormats, 0); + return 0; +} + + +/** push query result into lua stack */ +static int +lua_push_pgres(struct lua_State *L, PGresult *r) +{ + if (!r) + luaL_error(L, "PG internal error: zero rults"); + + switch(PQresultStatus(r)) { + case PGRES_COMMAND_OK: + lua_newtable(L); + if (*PQcmdTuples(r) == 0) { + lua_pushnumber(L, 0); + } else { + lua_pushstring(L, PQcmdTuples(r)); + double v = lua_tonumber(L, -1); + lua_pop(L, 1); + lua_pushnumber(L, v); + } + lua_pushstring(L, PQcmdStatus(r)); + PQclear(r); + return 3; + + case PGRES_TUPLES_OK: + break; + + case PGRES_BAD_RESPONSE: + PQclear(r); + luaL_error(L, "Broken postgrql rponse"); + + case PGRES_FATAL_ERROR: + case PGRES_NONFATAL_ERROR: + case PGRES_EMPTY_QUERY: + PQclear(r); + luaL_error(L, "PG error: %s", PQresultErrorMessage(r)); + + default: + PQclear(r); + luaL_error(L, "box.net.sql.pg: internal error"); + } + + lua_newtable(L); + int count = PQntuples(r); + int cols = PQnfields(r); + for (int i = 0; i < count; i++) { + lua_pushnumber(L, i + 1); + lua_newtable(L); + + for (int j = 0; j < cols; j++) { + if (PQgetisnull(r, i, j)) + continue; + + lua_pushstring(L, PQfname(r, j)); + const char *s = PQgetvalue(r, i, j); + int len = PQgetlength(r, i, j); + + switch (PQftype(r, j)) { + case INT2OID: + case INT4OID: + case INT8OID: + case NUMERICOID: { + lua_pushlstring(L, s, len); + double v = lua_tonumber(L, -1); + lua_pop(L, 1); + lua_pushnumber(L, v); + break; + } + case BOOLOID: + if (*s == 't' || *s == 'T') + lua_pushboolean(L, 1); + else + lua_pushboolean(L, 0); + break; + + default: + lua_pushlstring(L, s, len); + break; + + } + + + lua_settable(L, -3); + } + + lua_settable(L, -3); + } + + if (*PQcmdTuples(r) == 0) { + lua_pushnumber(L, 0); + } else { + lua_pushstring(L, PQcmdTuples(r)); + double v = lua_tonumber(L, -1); + lua_pop(L, 1); + lua_pushnumber(L, v); + } + lua_pushstring(L, PQcmdStatus(r)); + PQclear(r); + return 3; +} + + +/** execute method */ +static int +lua_pg_execute(struct lua_State *L) +{ + PGconn *conn = lua_check_pgconn(L, 1); + const char *sql = lua_tostring(L, 2); + + int count = lua_gettop(L) - 2; + const char **paramValues = NULL; + int *paramLengths = NULL; + int *paramFormats = NULL; + Oid *paramTypes = NULL; + if (count > 0) { + paramValues = alloca( count * sizeof(*paramValues) ); + paramLengths = alloca( count * sizeof(*paramLengths) ); + paramFormats = alloca( count * sizeof(*paramFormats) ); + paramTypes = alloca( count * sizeof(*paramTypes) ); + for(int i = 0, idx = 3; i < count; i++, idx++) { + if (lua_isnil(L, idx)) { + paramValues[i] = NULL; + paramLengths[i] = 0; + paramFormats[i] = 0; + paramTypes[i] = 0; + continue; + } + + if (lua_isboolean(L, idx)) { + int v = lua_toboolean(L, idx); + static const char pg_true[] = "t"; + static const char pg_false[] = "f"; + paramValues[i] = v ? pg_true : pg_false; + paramLengths[i] = 1; + paramFormats[i] = 0; + paramTypes[i] = BOOLOID; + continue; + } + + size_t len; + const char *s = lua_tolstring(L, idx, &len); + + if (lua_isnumber(L, idx)) { + paramTypes[i] = NUMERICOID; + paramValues[i] = s; + paramLengths[i] = len; + paramFormats[i] = 0; + continue; + } + + + paramValues[i] = s; + paramLengths[i] = len; + paramFormats[i] = 0; + paramTypes[i] = TEXTOID; + + } + + /* transform sql placeholders */ + luaL_Buffer b; + luaL_buffinit(L, &b); + char num[10]; + for (int i = 0, j = 1; sql[i]; i++) { + if (sql[i] != '?') { + luaL_addchar(&b, sql[i]); + continue; + } + luaL_addchar(&b, '$'); + + snprintf(num, 10, "%d", j++); + luaL_addstring(&b, num); + } + luaL_pushresult(&b); + sql = lua_tostring(L, -1); + } + + PGresult *res = NULL; + if (coeio_custom(pg_exec, TIMEOUT_INFINITY, conn, + sql, count, paramTypes, paramValues, + paramLengths, paramFormats, &res) == -1) { + + luaL_error(L, "Can't execute sql: %s", + strerror(errno)); + } + + return lua_push_pgres(L, res); +} + +/** collect connection */ +static int +lua_pg_gc(struct lua_State *L) +{ + PGconn *conn = lua_check_pgconn(L, 1); + PQfinish(conn); + return 0; +} + + +/** do connect to postgresql (is run in the other thread) */ +static ssize_t +pg_connect(va_list ap) +{ + const char *constr = va_arg(ap, typeof(constr)); + PGconn **conn = va_arg(ap, typeof(conn)); + *conn = PQconnectdb(constr); + return 0; +} + +/** returns self.field as C-string */ +static const char * +self_field(struct lua_State *L, const char *name, int index) +{ + lua_pushstring(L, name); + if (index < 0) + index--; + lua_rawget(L, index); + const char *res = lua_tostring(L, -1); + lua_pop(L, 1); + return res; +} + + +/** connect to postgresql */ int lbox_net_pg_connect(struct lua_State *L) { - lua_pushstring(L, "aaaaaaaaaaaaaaaaaaa"); + PGconn *conn = NULL; + + luaL_Buffer b; + luaL_buffinit(L, &b); + luaL_addstring(&b, "host='"); + luaL_addstring(&b, self_field(L, "host", 1)); + + luaL_addstring(&b, "' port='"); + luaL_addstring(&b, self_field(L, "port", 1)); + + luaL_addstring(&b, "' user='"); + luaL_addstring(&b, self_field(L, "user", 1)); + + luaL_addstring(&b, "' password='"); + luaL_addstring(&b, self_field(L, "password", 1)); + + luaL_addstring(&b, "' dbname='"); + luaL_addstring(&b, self_field(L, "db", 1)); + + luaL_addchar(&b, '\''); + luaL_pushresult(&b); + + const char *constr = lua_tostring(L, -1); + + if (coeio_custom(pg_connect, TIMEOUT_INFINITY, constr, &conn) == -1) { + luaL_error(L, "Can't connect to postgresql: %s", + strerror(errno)); + } + + /* cleanup stack */ + lua_pop(L, 1); + + if (PQstatus(conn) != CONNECTION_OK) { + char *msg = alloca(strlen(PQerrorMessage(conn)) + 1); + strcpy(msg, PQerrorMessage(conn)); + PQfinish(conn); + luaL_error(L, constr); + } + + lua_pushstring(L, "raw"); + void **ptr = lua_newuserdata(L, sizeof(conn)); + *ptr = conn; + + lua_newtable(L); + lua_pushstring(L, "__index"); + + lua_newtable(L); + + static const struct luaL_reg meta [] = { + {"execute", lua_pg_execute}, + {NULL, NULL} + }; + luaL_register(L, NULL, meta); + lua_settable(L, -3); + + lua_pushstring(L, "__gc"); + lua_pushcfunction(L, lua_pg_gc); + lua_settable(L, -3); + + + lua_setmetatable(L, -2); + lua_rawset(L, 1); + + /* return self */ + lua_pushvalue(L, 1); return 1; } diff --git a/test/box/net_sql.result b/test/box/net_sql.result index 5e5fe3ef33397233a3c13966b6661efd03d37693..c223bad9f3a567475c64b53aae4a55ecfa33baba 100644 --- a/test/box/net_sql.result +++ b/test/box/net_sql.result @@ -1,4 +1,77 @@ -lua 1 +lua dump = function(t) return box.cjson.encode(t) end --- - - 1 +... +lua c = box.net.sql.connect('pg', 'localhost', 5432, 'tarantool', 'tarantool', 'tarantool') +--- +... +lua dump({c:execute('SELECT 123::text AS bla, 345')}) +--- + - [[{"?column?":345,"bla":"123"}],1,"SELECT 1"] +... +lua dump({c:execute('SELECT -1 AS neg, NULL AS abc')}) +--- + - [[{"neg":-1}],1,"SELECT 1"] +... +lua dump({c:execute('SELECT -1.1 AS neg, 1.2 AS pos')}) +--- + - [[{"neg":-1.1,"pos":1.2}],1,"SELECT 1"] +... +lua dump({c:execute('SELECT ARRAY[1,2] AS neg, 1.2 AS pos')}) +--- + - [[{"neg":"{1,2}","pos":1.2}],1,"SELECT 1"] +... +lua dump({c:execute('SELECT ? AS val', 'abc')}) +--- + - [[{"val":"abc"}],1,"SELECT 1"] +... +lua dump({c:execute('SELECT ? AS val', 123)}) +--- + - [[{"val":123}],1,"SELECT 1"] +... +lua dump({c:execute('SELECT ? AS val', true)}) +--- + - [[{"val":true}],1,"SELECT 1"] +... +lua dump({c:execute('SELECT ? AS val', false)}) +--- + - [[{"val":false}],1,"SELECT 1"] +... +lua dump({c:execute('SELECT ? AS val, ? AS num, ? AS str', false, 123, 'abc')}) +--- + - [[{"str":"abc","num":123,"val":false}],1,"SELECT 1"] +... +lua dump({c:execute('DROP TABLE IF EXISTS unknown_table')}) +--- + - [{},0,"DROP TABLE"] +... +lua dump({c:execute('SELECT * FROM (VALUES (1,2), (2,3)) t')}) +--- + - [[{"column1":1,"column2":2},{"column1":2,"column2":3}],2,"SELECT 2"] +... +lua c:ping() +--- + - true +... +lua dump({c:select('SELECT * FROM (VALUES (1,2), (2,3)) t')}) +--- + - [[{"column1":1,"column2":2},{"column1":2,"column2":3}]] +... +lua dump({c:single('SELECT * FROM (VALUES (1,2), (2,3)) t')}) +--- +error: '[string "box.net.sql = {..."]:151: SQL request returned multiply rows' +... +lua dump({c:single('SELECT * FROM (VALUES (1,2)) t')}) +--- + - [{"column1":1,"column2":2}] +... +lua dump({c:perform('SELECT * FROM (VALUES (1,2), (2,3)) t')}) +--- + - [2] +... +lua c:execute('SELEC T') +--- +error: '[string "box.net.sql = {..."]:100: PG error: ERROR: syntax error at or near "SELEC" +LINE 1: SELEC T + ^ +' ... diff --git a/test/box/net_sql.test b/test/box/net_sql.test index caafa1bc38e8cf30d8ec10da66cf381cdfb4e77f..d20f0621b0a84750bf4d61af8982d05017cfe0e6 100644 --- a/test/box/net_sql.test +++ b/test/box/net_sql.test @@ -1,4 +1,21 @@ # encoding: tarantool +exec admin "lua dump = function(t) return box.cjson.encode(t) end" exec admin "lua c = box.net.sql.connect('pg', 'localhost', 5432, 'tarantool', 'tarantool', 'tarantool')" -exec admin "lua c" +exec admin "lua dump({c:execute('SELECT 123::text AS bla, 345')})" +exec admin "lua dump({c:execute('SELECT -1 AS neg, NULL AS abc')})" +exec admin "lua dump({c:execute('SELECT -1.1 AS neg, 1.2 AS pos')})" +exec admin "lua dump({c:execute('SELECT ARRAY[1,2] AS neg, 1.2 AS pos')})" +exec admin "lua dump({c:execute('SELECT ? AS val', 'abc')})" +exec admin "lua dump({c:execute('SELECT ? AS val', 123)})" +exec admin "lua dump({c:execute('SELECT ? AS val', true)})" +exec admin "lua dump({c:execute('SELECT ? AS val', false)})" +exec admin "lua dump({c:execute('SELECT ? AS val, ? AS num, ? AS str', false, 123, 'abc')})" +exec admin "lua dump({c:execute('DROP TABLE IF EXISTS unknown_table')})" +exec admin "lua dump({c:execute('SELECT * FROM (VALUES (1,2), (2,3)) t')})" +exec admin "lua c:ping()" +exec admin "lua dump({c:select('SELECT * FROM (VALUES (1,2), (2,3)) t')})" +exec admin "lua dump({c:single('SELECT * FROM (VALUES (1,2), (2,3)) t')})" +exec admin "lua dump({c:single('SELECT * FROM (VALUES (1,2)) t')})" +exec admin "lua dump({c:perform('SELECT * FROM (VALUES (1,2), (2,3)) t')})" +exec admin "lua c:execute('SELEC T')"