diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc index e81b1203d754fd5c79d0bde655b8ae13c7ad3b1d..99eeae31cc983d2390abf136a8d76a59d810e525 100644 --- a/src/box/lua/call.cc +++ b/src/box/lua/call.cc @@ -208,6 +208,123 @@ lbox_process(lua_State *L) return lua_gettop(L) - top; } +static int +lbox_select(lua_State *L) +{ + if (lua_gettop(L) < 3 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2)) { + return luaL_error(L, "Usage index:select(key)"); + } + + RegionGuard region_guard(&fiber()->gc); + struct tbuf *buf = tbuf_new(&fiber()->gc); + + lua_settop(L, 3); /* leave key on the top */ + luamp_encode(L, buf, 3); + + struct request request; + request_create(&request, IPROTO_SELECT, NULL, 0); + request.space_id = lua_tointeger(L, 1); + request.index_id = lua_tointeger(L, 2); + request.limit = 4294967295; + request.key = buf->data; + request.key_end = buf->data + buf->size; + + struct port *port_lua = port_lua_create(L); + box_process(port_lua, &request); + + return lua_gettop(L) - 3; +} + +static int +lbox_insert_replace(lua_State *L, enum iproto_request_type type) +{ + assert(type == IPROTO_INSERT || type == IPROTO_REPLACE); + if (lua_gettop(L) < 2 || !lua_isnumber(L, 1)) + return luaL_error(L, "Usage space:replace(tuple)"); + + RegionGuard region_guard(&fiber()->gc); + struct tbuf *buf = tbuf_new(&fiber()->gc); + + lua_settop(L, 2); /* leave tuple on the top */ + luamp_encode(L, buf, 2); + + struct request request; + request_create(&request, type, NULL, 0); + request.space_id = lua_tointeger(L, 1); + request.tuple = buf->data; + request.tuple_end = buf->data + buf->size; + + struct port *port_lua = port_lua_create(L); + box_process(port_lua, &request); + return lua_gettop(L) - 2; +} + +static int +lbox_insert(lua_State *L) +{ + return lbox_insert_replace(L, IPROTO_INSERT); +} + +static int +lbox_replace(lua_State *L) +{ + return lbox_insert_replace(L, IPROTO_REPLACE); +} + +static int +lbox_update(lua_State *L) +{ + if (lua_gettop(L) < 4 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2)) + return luaL_error(L, "Usage index:update(key)"); + + RegionGuard region_guard(&fiber()->gc); + struct tbuf *buf = tbuf_new(&fiber()->gc); + + lua_settop(L, 4); /* leave tuple on the top */ + luamp_encode(L, buf, 4); + uint32_t tuple_len = buf->size; + lua_pop(L, 1); /* leave key on the top */ + luamp_encode(L, buf, 3); + + struct request request; + request_create(&request, IPROTO_UPDATE, NULL, 0); + request.space_id = lua_tointeger(L, 1); + request.index_id = lua_tointeger(L, 2); + request.tuple = buf->data; + request.tuple_end = buf->data + tuple_len; + request.key = buf->data + tuple_len; + request.key_end = buf->data + buf->size; + + struct port *port_lua = port_lua_create(L); + box_process(port_lua, &request); + return lua_gettop(L) - 3; +} + +static int +lbox_delete(lua_State *L) +{ + if (lua_gettop(L) < 3 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2)) + return luaL_error(L, "Usage index:delete(key)"); + + + RegionGuard region_guard(&fiber()->gc); + struct tbuf *buf = tbuf_new(&fiber()->gc); + + lua_settop(L, 3); /* leave key on the top */ + luamp_encode(L, buf, 3); + + struct request request; + request_create(&request, IPROTO_DELETE, NULL, 0); + request.space_id = lua_tointeger(L, 1); + request.index_id = lua_tointeger(L, 2); + request.key = buf->data; + request.key_end = buf->data + buf->size; + + struct port *port_lua = port_lua_create(L); + box_process(port_lua, &request); + return lua_gettop(L) - 3; +} + static int lbox_raise(lua_State *L) { @@ -305,7 +422,7 @@ lbox_call_loadproc(struct lua_State *L) * (implementation of 'CALL' command code). */ void -box_lua_call(const struct request *request, struct txn *txn, +box_lua_call(struct request *request, struct txn *txn, struct port *port) { (void) txn; @@ -720,6 +837,11 @@ lbox_unpack(struct lua_State *L) static const struct luaL_reg boxlib[] = { {"process", lbox_process}, + {"_select", lbox_select}, + {"_insert", lbox_insert}, + {"_replace", lbox_replace}, + {"_update", lbox_update}, + {"_delete", lbox_delete}, {"call_loadproc", lbox_call_loadproc}, {"raise", lbox_raise}, {"pack", lbox_pack}, diff --git a/src/box/lua/call.h b/src/box/lua/call.h index 97f07b294540da6af8cd6d44360b8f49747d40b6..204c6cd7123322484be5ac822fb86f53620eaf2a 100644 --- a/src/box/lua/call.h +++ b/src/box/lua/call.h @@ -38,7 +38,6 @@ struct port; * (implementation of 'CALL' command code). */ void -box_lua_call(const struct request *request, struct txn *txn, - struct port *port); +box_lua_call(struct request *request, struct txn *txn, struct port *port); #endif /* INCLUDES_TARANTOOL_MOD_BOX_LUA_CALL_H */ diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua index b9b0cbf08bd0bf962317f8da466e2d6a0af6189c..94c9a2341f710bc92faf778358fd30f1b6c52ed1 100644 --- a/src/box/lua/schema.lua +++ b/src/box/lua/schema.lua @@ -199,13 +199,18 @@ function box.schema.space.bless(space) return index.idx:count(...) end + local function check_index(space, index_id) + if space.index[index_id] == nil then + box.raise(box.error.ER_NO_SUCH_INDEX, + string.format("No index #%d is defined in space %d", index_id, + space.n)) + end + end + -- eselect index_mt.eselect = function(index, key, opts) -- user can catch link to index - if box.space[ index.n ].index[ index.id ] == nil then - box.raise(box.error.ER_NO_SUCH_INDEX, - string.format("No index #0 is defined in space %d", index.n)) - end + check_index(box.space[index.n], index.id) if opts == nil then opts = {} @@ -269,13 +274,13 @@ function box.schema.space.bless(space) -- index_mt.select = function(index, key) - return box.process(box.net.box.SELECT, - msgpack.encode({ - [box.net.box.SPACE_ID] = index.n, - [box.net.box.INDEX_ID] = index.id, - [box.net.box.OFFSET] = 0, - [box.net.box.LIMIT] = 4294967295, - [box.net.box.KEY] = keify(key)})) + return box._select(index.n, index.id, keify(key)) + end + index_mt.update = function(index, key, ops) + return box._update(index.n, index.id, keify(key), ops); + end + index_mt.delete = function(index, key) + return box._delete(index.n, index.id, keify(key)); end index_mt.drop = function(index) return box.schema.index.drop(index.n, index.id) @@ -292,54 +297,27 @@ function box.schema.space.bless(space) space_mt.__newindex = index_mt.__newindex space_mt.eselect = function(space, key, opts) - if box.space[ space.n ].index[ 0 ] == nil then - box.raise(box.error.ER_NO_SUCH_INDEX, - string.format("No index #0 is defined in space %d", space.n)) - end + check_index(space, 0) return space.index[0]:eselect(key, opts) end space_mt.select = function(space, key) - return box.process(box.net.box.SELECT, - msgpack.encode({ - [box.net.box.SPACE_ID] = space.n, - [box.net.box.INDEX_ID] = 0, - [box.net.box.OFFSET] = 0, - [box.net.box.LIMIT] = 4294967295, - [box.net.box.KEY] = keify(key)})) + check_index(space, 0) + return space.index[0]:select(key) end space_mt.insert = function(space, tuple) - return box.process(box.net.box.INSERT, - msgpack.encode({ - [box.net.box.SPACE_ID] = space.n, - [box.net.box.TUPLE] = tuple - })) + return box._insert(space.n, tuple); end space_mt.replace = function(space, tuple) - return box.process(box.net.box.REPLACE, - msgpack.encode({ - [box.net.box.SPACE_ID] = space.n, - [box.net.box.TUPLE] = tuple - })) + return box._replace(space.n, tuple); end space_mt.update = function(space, key, ops) - return box.process(box.net.box.UPDATE, - msgpack.encode({ - [box.net.box.SPACE_ID] = space.n, - [box.net.box.KEY] = keify(key), - [box.net.box.TUPLE] = ops - })) + check_index(space, 0) + return space.index[0]:update(key, ops) end --- --- delete can be done only by the primary key, whose --- index is always 0. It doesn't accept compound keys --- space_mt.delete = function(space, key) - return box.process(box.net.box.DELETE, - msgpack.encode({ - [box.net.box.SPACE_ID] = space.n, - [box.net.box.KEY] = keify(key) - })) + check_index(space, 0) + return space.index[0]:delete(key) end -- Assumes that spaceno has a TREE (NUM) primary key -- inserts a tuple after getting the next value of the @@ -355,11 +333,8 @@ function box.schema.space.bless(space) end space_mt.truncate = function(space) + check_index(space, 0) local pk = space.index[0] - if pk == nil then - box.raise(box.error.ER_NO_SUCH_INDEX, - "No index #0 is defined in space "..space.n); - end while #pk.idx > 0 do for t in pk:iterator() do local key = {} diff --git a/src/box/request.cc b/src/box/request.cc index 1596722d9f15c9fc9e298a044f94cf79110fcaea..0e11d36617cccc14643de538e3977b20595a6ad3 100644 --- a/src/box/request.cc +++ b/src/box/request.cc @@ -79,11 +79,10 @@ dup_replace_mode(uint32_t op) } static void -execute_replace(const struct request *request, struct txn *txn, - struct port *port) +execute_replace(struct request *request, struct txn *txn, struct port *port) { (void) port; - txn_add_redo(txn, request->type, request->data, request->len); + txn_add_redo(txn, request); struct space *space = space_find(request->space_id); struct tuple *new_tuple = tuple_new(space->format, request->tuple, @@ -95,11 +94,11 @@ execute_replace(const struct request *request, struct txn *txn, } static void -execute_update(const struct request *request, struct txn *txn, +execute_update(struct request *request, struct txn *txn, struct port *port) { (void) port; - txn_add_redo(txn, request->type, request->data, request->len); + txn_add_redo(txn, request); /* Parse UPDATE request. */ /** Search key and key part count. */ @@ -128,8 +127,7 @@ execute_update(const struct request *request, struct txn *txn, /** }}} */ static void -execute_select(const struct request *request, struct txn *txn, - struct port *port) +execute_select(struct request *request, struct txn *txn, struct port *port) { (void) txn; struct space *space = space_find(request->space_id); @@ -161,11 +159,10 @@ execute_select(const struct request *request, struct txn *txn, } static void -execute_delete(const struct request *request, struct txn *txn, - struct port *port) +execute_delete(struct request *request, struct txn *txn, struct port *port) { (void) port; - txn_add_redo(txn, request->type, request->data, request->len); + txn_add_redo(txn, request); struct space *space = space_find(request->space_id); /* Try to find tuple by primary key */ @@ -181,6 +178,92 @@ execute_delete(const struct request *request, struct txn *txn, txn_replace(txn, space, old_tuple, NULL, DUP_REPLACE_OR_INSERT); } +static void +fill_replace(struct request *request) +{ + assert(request->data == NULL); + assert(request->type == IPROTO_INSERT || + request->type == IPROTO_REPLACE); + char *d; + uint32_t len = mp_sizeof_map(2) + + mp_sizeof_uint(IPROTO_SPACE_ID) + + mp_sizeof_uint(UINT32_MAX) + + mp_sizeof_uint(IPROTO_TUPLE) + + (request->tuple_end - request->tuple); + + request->data = d = (char *) region_alloc(&fiber()->gc, len); + d = mp_encode_map(d, 2); + d = mp_encode_uint(d, IPROTO_SPACE_ID); + d = mp_encode_uint(d, request->space_id); + d = mp_encode_uint(d, IPROTO_TUPLE); + memcpy(d, request->tuple, (request->tuple_end - request->tuple)); + d += (request->tuple_end - request->tuple); + + request->len = (d - request->data); + assert(request->len <= len); +} + +static void +fill_update(struct request *request) +{ + assert(request->data == NULL); + assert(request->type == IPROTO_UPDATE); + + char *d; + uint32_t len = mp_sizeof_map(4) + + mp_sizeof_uint(IPROTO_SPACE_ID) + + mp_sizeof_uint(UINT32_MAX) + + mp_sizeof_uint(IPROTO_INDEX_ID) + + mp_sizeof_uint(UINT32_MAX) + + mp_sizeof_uint(IPROTO_KEY) + + (request->key_end - request->key) + + mp_sizeof_uint(IPROTO_TUPLE) + + (request->tuple_end - request->tuple); + + request->data = d = (char *) region_alloc(&fiber()->gc, len); + d = mp_encode_map(d, 4); + d = mp_encode_uint(d, IPROTO_SPACE_ID); + d = mp_encode_uint(d, request->space_id); + d = mp_encode_uint(d, IPROTO_INDEX_ID); + d = mp_encode_uint(d, request->index_id); + d = mp_encode_uint(d, IPROTO_KEY); + memcpy(d, request->key, (request->key_end - request->key)); + d += (request->key_end - request->key); + d = mp_encode_uint(d, IPROTO_TUPLE); + memcpy(d, request->tuple, (request->tuple_end - request->tuple)); + d += (request->tuple_end - request->tuple); + + request->len = (d - request->data); + assert(request->len <= len); +} + +static void +fill_delete(struct request *request) +{ + assert(request->data == NULL); + assert(request->type == IPROTO_DELETE); + + char *d; + uint32_t len = mp_sizeof_map(3) + + mp_sizeof_uint(IPROTO_SPACE_ID) + + mp_sizeof_uint(UINT32_MAX) + + mp_sizeof_uint(IPROTO_INDEX_ID) + + mp_sizeof_uint(UINT32_MAX) + + mp_sizeof_uint(IPROTO_KEY) + + (request->key_end - request->key); + request->data = d = (char *) region_alloc(&fiber()->gc, len); + d = mp_encode_map(d, 3); + d = mp_encode_uint(d, IPROTO_SPACE_ID); + d = mp_encode_uint(d, request->space_id); + d = mp_encode_uint(d, IPROTO_INDEX_ID); + d = mp_encode_uint(d, request->index_id); + d = mp_encode_uint(d, IPROTO_KEY); + memcpy(d, request->key, (request->key_end - request->key)); + d += (request->key_end - request->key); + request->len = (d - request->data); + assert(request->len <= len); +} + void request_check_type(uint32_t type) { @@ -200,11 +283,18 @@ request_create(struct request *request, uint32_t type, NULL, execute_select, execute_replace, execute_replace, execute_update, execute_delete, box_lua_call }; + static const request_fill_f fill_map[] = { + NULL, NULL, fill_replace, fill_replace, + fill_update, fill_delete, NULL + }; memset(request, 0, sizeof(*request)); request->type = type; request->data = data; request->len = len; request->execute = execute_map[type]; + request->fill = fill_map[type]; + if (data == NULL) + return; const char *end = data + len; diff --git a/src/box/request.h b/src/box/request.h index 8e9996fff2973e9064079c9808a3d8378cf77bc4..ddef8cb6814312829b51124cbda2a79142af4692 100644 --- a/src/box/request.h +++ b/src/box/request.h @@ -34,9 +34,12 @@ struct txn; struct port; -typedef void (*request_execute_f)(const struct request *, +typedef void (*request_execute_f)(struct request *, struct txn *, struct port *); + +typedef void (*request_fill_f)(struct request *); + struct request { uint32_t type; @@ -54,6 +57,7 @@ struct request const char *data; uint32_t len; request_execute_f execute; + request_fill_f fill; }; void diff --git a/src/box/txn.cc b/src/box/txn.cc index 51af9abe301c181ed4554241d808f8a11ebc714b..53d12203aab2eafdba99816bbb00c752fa46730e 100644 --- a/src/box/txn.cc +++ b/src/box/txn.cc @@ -36,11 +36,9 @@ #include "request.h" /* for request_name */ void -txn_add_redo(struct txn *txn, uint16_t op, const char *data, uint32_t len) +txn_add_redo(struct txn *txn, struct request *request) { - txn->op = op; - txn->data = data; - txn->len = len; + txn->request = request; } void @@ -49,7 +47,7 @@ txn_replace(struct txn *txn, struct space *space, enum dup_replace_mode mode) { /* txn_add_undo() must be done after txn_add_redo() */ - assert(txn->op != 0); + assert(txn->request->type != 0); assert(old_tuple || new_tuple); /* * Remember the old tuple only if we replaced it @@ -87,14 +85,22 @@ txn_commit(struct txn *txn) !space_is_temporary(txn->space)) { int64_t lsn = next_lsn(recovery_state); + if (txn->request->data == NULL && + recovery_state->wal_mode != WAL_NONE) { + /* Generate binary body for Lua requests */ + assert(txn->request->fill != NULL); + txn->request->fill(txn->request); + } + ev_tstamp start = ev_now(), stop; int res = wal_write(recovery_state, lsn, fiber()->cookie, - txn->op, txn->data, txn->len); + txn->request->type, txn->request->data, + txn->request->len); stop = ev_now(); if (stop - start > cfg.too_long_threshold) { say_warn("too long %s: %.3f sec", - iproto_request_name(txn->op), stop - start); + iproto_request_name(txn->request->type), stop - start); } confirm_lsn(recovery_state, lsn, res == 0); diff --git a/src/box/txn.h b/src/box/txn.h index 3d6526d37fdb8eb819c81dd328e8b0fdd8fc0296..505aef6b5c7185412b55ce577ca95764abf94489 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -44,16 +44,14 @@ struct txn { struct rlist on_rollback; /* Redo info: binary packet */ - const char *data; - uint32_t len; - uint16_t op; + struct request *request; }; struct txn *txn_begin(); void txn_commit(struct txn *txn); void txn_finish(struct txn *txn); void txn_rollback(struct txn *txn); -void txn_add_redo(struct txn *txn, uint16_t op, const char *data, uint32_t len); +void txn_add_redo(struct txn *txn, struct request *request); void txn_replace(struct txn *txn, struct space *space, struct tuple *old_tuple, struct tuple *new_tuple, enum dup_replace_mode mode); diff --git a/test/box/misc.result b/test/box/misc.result index f71ecc9338cecea7e84d96756bdb27194fa5eee4..9488a70b35931f6ac1d4f0ea478dd63f2ec24e4b 100644 --- a/test/box/misc.result +++ b/test/box/misc.result @@ -16,7 +16,12 @@ t = {} for n in pairs(box) do table.insert(t, tostring(n)) end table.sort(t) ... t --- -- - call_loadproc +- - _delete + - _insert + - _replace + - _select + - _update + - call_loadproc - cfg - cjson - coredump