diff --git a/src/box/session.cc b/src/box/session.cc index 98de6833f48edb001d9ab0fbf0caf51647ecbe18..c892a411f9d3c69686b5cfafd346b4c1a4602fdc 100644 --- a/src/box/session.cc +++ b/src/box/session.cc @@ -35,7 +35,6 @@ #include "exception.h" #include "random.h" #include <sys/socket.h> -#include "box/txn.h" static struct mh_i32ptr_t *session_registry; @@ -76,7 +75,6 @@ session_create(int fd, uint64_t cookie) session->id = sid_max(); session->fd = fd; session->cookie = cookie; - session->txn = NULL; session->fiber_on_stop = { rlist_nil, session_on_stop, NULL, NULL }; @@ -121,11 +119,6 @@ session_run_on_connect_triggers(struct session *session) void session_destroy(struct session *session) { - if (session->txn) { - assert(session->txn == in_txn()); - txn_rollback(); - } - assert(session->txn == NULL); struct mh_i32ptr_node_t node = { session->id, NULL }; mh_i32ptr_remove(session_registry, &node, NULL); mempool_free(&session_pool, session); diff --git a/src/box/session.h b/src/box/session.h index c2fc6570ff469cf637dadc889c5f7de217ff271d..a1d2157ec482404cee0b4e15f1daec6c940fa053 100644 --- a/src/box/session.h +++ b/src/box/session.h @@ -59,8 +59,6 @@ struct session { uint8_t auth_token; /** User id of the authenticated user. */ uint32_t uid; - /** Current transaction, if started. */ - struct txn *txn; /** Trigger for fiber on_stop to cleanup created on-demand session */ struct trigger fiber_on_stop; }; diff --git a/src/box/txn.cc b/src/box/txn.cc index 4b8a6caa77327cb2147dd76e8218c6602fe05d15..6a92204bb936576533ebe9d197236f64bd70c0ff 100644 --- a/src/box/txn.cc +++ b/src/box/txn.cc @@ -55,7 +55,7 @@ txn_add_redo(struct txn_stmt *stmt, struct request *request) } static void -txn_on_yield(struct trigger * /* trigger */, void * /* event */) +txn_on_yield_or_stop(struct trigger * /* trigger */, void * /* event */) { txn_rollback(); /* doesn't throw */ } @@ -94,6 +94,8 @@ txn_replace(struct txn *txn, struct space *space, if (engine_no_yield(txn->engine)) { trigger_add(&fiber()->on_yield, &txn->fiber_on_yield); + trigger_add(&fiber()->on_stop, + &txn->fiber_on_stop); } } else if (txn->engine != engine_id(space->engine)) tnt_raise(ClientError, ER_CROSS_ENGINE_TRANSACTION); @@ -129,25 +131,28 @@ txn_stmt_new(struct txn *txn) struct txn * txn_begin(bool autocommit) { - assert(! in_txn()); + assert(! fiber_get_txn(fiber())); struct txn *txn = (struct txn *) region_alloc0(&fiber()->gc, sizeof(*txn)); rlist_create(&txn->stmts); rlist_create(&txn->on_commit); rlist_create(&txn->on_rollback); txn->fiber_on_yield = { - rlist_nil, txn_on_yield, NULL, NULL + rlist_nil, txn_on_yield_or_stop, NULL, NULL + }; + txn->fiber_on_stop = { + rlist_nil, txn_on_yield_or_stop, NULL, NULL }; txn->autocommit = autocommit; - in_txn() = txn; + fiber_set_txn(fiber(), txn); return txn; } struct txn * txn_begin_stmt(struct request *request) { - struct txn *txn = in_txn(); + struct txn *txn = fiber_get_txn(fiber()); if (txn == NULL) txn = txn_begin(true); struct txn_stmt *stmt = txn_stmt_new(txn); @@ -161,19 +166,26 @@ txn_commit_stmt(struct txn *txn, struct port *port) struct txn_stmt *stmt; struct tuple *tuple; stmt = txn_stmt(txn); + if (txn->autocommit) + txn_commit(txn); + /* Adding result to port must be after possible WAL write. + * The reason is that any yield between port_add_tuple and port_eof + * calls could lead to sending not finished response to iproto socket. + */ if ((tuple = stmt->new_tuple) || (tuple = stmt->old_tuple)) port_add_tuple(port, tuple); if (txn->autocommit) - txn_commit(txn); + txn_finish(txn); } void txn_commit(struct txn *txn) { - assert(txn == in_txn()); + assert(txn == fiber_get_txn(fiber())); struct txn_stmt *stmt; /* if (!txn->autocommit && txn->n_stmts && engine_no_yield(txn->engine)) */ trigger_clear(&txn->fiber_on_yield); + trigger_clear(&txn->fiber_on_stop); rlist_foreach_entry(stmt, &txn->stmts, next) { if ((!stmt->old_tuple && !stmt->new_tuple) || space_is_temporary(stmt->space)) @@ -197,6 +209,12 @@ txn_commit(struct txn *txn) tnt_raise(LoggedError, ER_WAL_IO); } trigger_run(&txn->on_commit, txn); /* must not throw. */ +} + +void +txn_finish(struct txn *txn) +{ + struct txn_stmt *stmt; rlist_foreach_entry(stmt, &txn->stmts, next) { if (stmt->old_tuple) tuple_ref(stmt->old_tuple, -1); @@ -206,7 +224,7 @@ txn_commit(struct txn *txn) TRASH(txn); /** Free volatile txn memory. */ fiber_gc(); - in_txn() = NULL; + fiber_set_txn(fiber(), NULL); } /** @@ -218,7 +236,7 @@ txn_commit(struct txn *txn) void txn_rollback_stmt() { - struct txn *txn = in_txn(); + struct txn *txn = fiber_get_txn(fiber()); if (txn == NULL) return; if (txn->autocommit) @@ -238,7 +256,7 @@ txn_rollback_stmt() void txn_rollback() { - struct txn *txn = in_txn(); + struct txn *txn = fiber_get_txn(fiber()); if (txn == NULL) return; struct txn_stmt *stmt; @@ -255,10 +273,11 @@ txn_rollback() } /* if (!txn->autocommit && txn->n_stmts && engine_no_yield(txn->engine)) */ trigger_clear(&txn->fiber_on_yield); + trigger_clear(&txn->fiber_on_stop); TRASH(txn); /** Free volatile txn memory. */ fiber_gc(); - in_txn() = NULL; + fiber_set_txn(fiber(), NULL); } void @@ -276,7 +295,7 @@ int boxffi_txn_begin() { try { - if (in_txn()) + if (fiber_get_txn(fiber())) tnt_raise(ClientError, ER_ACTIVE_TRANSACTION); (void) txn_begin(false); } catch (Exception *e) { @@ -289,15 +308,17 @@ int boxffi_txn_commit() { try { - struct txn *txn = in_txn(); + struct txn *txn = fiber_get_txn(fiber()); /** * COMMIT is like BEGIN or ROLLBACK * a "transaction-initiating statement". * Do nothing if transaction is not started, * it's the same as BEGIN + COMMIT. */ - if (txn) + if (txn) { txn_commit(txn); + txn_finish(txn); + } } catch (Exception *e) { return -1; /* pass exception through FFI */ } diff --git a/src/box/txn.h b/src/box/txn.h index aa25e53a0ac7ff59f02fd1d9be4459b8495e5fd1..412a90a5b3b0dee41d7c9ca6b3bb7d1b66010b5f 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -30,7 +30,7 @@ */ #include "index.h" #include "trigger.h" -#include "session.h" +#include "fiber.h" extern double too_long_threshold; struct tuple; @@ -69,12 +69,22 @@ struct txn { bool autocommit; /** Id of the engine involved in multi-statement transaction. */ uint8_t engine; - /** Trigger on fiber yield to abort transaction for in-memory engine */ - struct trigger fiber_on_yield; + /** Triggers on fiber yield and stop to abort transaction for in-memory engine */ + struct trigger fiber_on_yield, fiber_on_stop; }; /* Pointer to the current transaction (if any) */ -#define in_txn() (session()->txn) +static inline struct txn * +fiber_get_txn(struct fiber *fiber) +{ + return (struct txn *) fiber_get_key(fiber, FIBER_KEY_TXN); +} + +static inline void +fiber_set_txn(struct fiber *fiber, struct txn *txn) +{ + fiber_set_key(fiber, FIBER_KEY_TXN, (void *) txn); +} /** * Start a new statement. If no current transaction, @@ -105,12 +115,19 @@ struct txn * txn_begin(bool autocommit); /** - * Commit a transaction. + * Commit a transaction. txn_finish must be called after that. * @pre txn == in_txn() */ void txn_commit(struct txn *txn); +/** + * Finish a transaction. Must be called after txn_commit. + * @pre txn == in_txn() + */ +void +txn_finish(struct txn *txn); + /** Rollback a transaction, if any. */ void txn_rollback(); diff --git a/src/fiber.h b/src/fiber.h index 40a948e3958312fc2715cf11eda2ab568ffdb5e4..eda96e01b620afe8c2c0625192ac77bf2668552a 100644 --- a/src/fiber.h +++ b/src/fiber.h @@ -83,7 +83,9 @@ enum fiber_key { FIBER_KEY_SESSION = 0, /** Lua fiber.storage */ FIBER_KEY_LUA_STORAGE = 1, - FIBER_KEY_MAX = 2 + /** transaction */ + FIBER_KEY_TXN = 2, + FIBER_KEY_MAX = 3 }; struct fiber { diff --git a/src/iproto_port.cc b/src/iproto_port.cc index 5a2f0ce1de764f2b8a8fee7d8c80ca2371eee83f..df15328626616bdc0c921a5bb0bbdcb08226a6b0 100644 --- a/src/iproto_port.cc +++ b/src/iproto_port.cc @@ -103,10 +103,12 @@ iproto_port_eof(struct port *ptr) { struct iproto_port *port = iproto_port(ptr); /* found == 0 means add_tuple wasn't called at all. */ - if (port->found == 0) + if (port->found == 0) { port->svp = obuf_book(port->buf, SVP_SIZE); + port->size += SVP_SIZE; + } - uint32_t len = obuf_size(port->buf) - port->svp.size - 5; + uint32_t len = port->size - 5; struct iproto_header_bin header = iproto_header_bin; header.v_len = mp_bswap_u32(len); @@ -127,8 +129,10 @@ iproto_port_add_tuple(struct port *ptr, struct tuple *tuple) if (++port->found == 1) { /* Found the first tuple, add header. */ port->svp = obuf_book(port->buf, SVP_SIZE); + port->size += SVP_SIZE; } tuple_to_obuf(tuple, port->buf); + port->size += tuple->bsize; } struct port_vtab iproto_port_vtab = { diff --git a/src/iproto_port.h b/src/iproto_port.h index b1791fecb31969029946ed00ad4509e722201198..0c45798ceac4da8d2d8ffbd0cee05acf07737fee 100644 --- a/src/iproto_port.h +++ b/src/iproto_port.h @@ -64,6 +64,8 @@ struct iproto_port uint32_t found; /** A pointer in the reply buffer where the reply starts. */ struct obuf_svp svp; + /** Size of data written after reply starts */ + uint32_t size; }; extern struct port_vtab iproto_port_vtab; @@ -76,6 +78,7 @@ iproto_port_init(struct iproto_port *port, struct obuf *buf, port->buf = buf; port->sync = sync; port->found = 0; + port->size = 0; } /** Stack a reply to 'ping' packet. */ diff --git a/test/box/iproto.result b/test/box/iproto.result index f08ef53e3fe7d79824212ade0ec389c255fde398..92cd4fd8ae85c073a7b34fb16e74d933be4d14f7 100644 --- a/test/box/iproto.result +++ b/test/box/iproto.result @@ -66,3 +66,43 @@ ping ... +box.cfg.wal_mode +--- +- write +... +space = box.schema.create_space('test', { id = 567 }) +--- +... +space:create_index('primary', { type = 'hash' }) +--- +... +box.schema.user.grant('guest', 'read,write,execute', 'space', 'test') +--- +... +--- +- [1, 'baobab'] +... +--- +- [2, 'obbaba'] +... +--- +- [1, 'baobab'] +... +--- +- [3, 'occama'] +... +--- +- [2, 'obbaba'] +... +--- +- [4, 'ockham'] +... +--- +- [1, 'baobab'] +... +--- +- [2, 'obbaba'] +... +space:drop() +--- +... diff --git a/test/box/iproto.test.py b/test/box/iproto.test.py index 096b9232271e92a38ed6ae2e6bf0ecffbff073ab..031ae92ee3731de4a7ebceda1342dae17a07495a 100644 --- a/test/box/iproto.test.py +++ b/test/box/iproto.test.py @@ -5,6 +5,9 @@ import socket import msgpack from tarantool.const import * from tarantool import Connection +from tarantool.request import RequestInsert +from tarantool.request import RequestSelect +from tarantool.response import Response print """ # @@ -83,3 +86,64 @@ print "IPROTO_CALL" test({ IPROTO_CODE : REQUEST_TYPE_CALL }, {}) test({ IPROTO_CODE : REQUEST_TYPE_CALL }, { IPROTO_KEY: ('procname', )}) print "\n" + +# gh-434 Tarantool crashes on multiple iproto requests with WAL enabled +admin("box.cfg.wal_mode") +admin("space = box.schema.create_space('test', { id = 567 })") +admin("space:create_index('primary', { type = 'hash' })") +admin("box.schema.user.grant('guest', 'read,write,execute', 'space', 'test')") + +c = Connection('localhost', server.sql.port) +c.connect() +request1 = RequestInsert(c, 567, [1, "baobab"]) +request2 = RequestInsert(c, 567, [2, "obbaba"]) +s = c._socket +try: + s.send(bytes(request1) + bytes(request2)) +except OSError as e: + print ' => ', 'Failed to send request' +response1 = Response(c, c._read_response()) +response2 = Response(c, c._read_response()) +print response1.__str__() +print response2.__str__() + +request1 = RequestInsert(c, 567, [3, "occama"]) +request2 = RequestSelect(c, 567, 0, [1], 0, 1, 0) +s = c._socket +try: + s.send(bytes(request1) + bytes(request2)) +except OSError as e: + print ' => ', 'Failed to send request' +response1 = Response(c, c._read_response()) +response2 = Response(c, c._read_response()) +print response1.__str__() +print response2.__str__() + +request1 = RequestSelect(c, 567, 0, [2], 0, 1, 0) +request2 = RequestInsert(c, 567, [4, "ockham"]) +s = c._socket +try: + s.send(bytes(request1) + bytes(request2)) +except OSError as e: + print ' => ', 'Failed to send request' +response1 = Response(c, c._read_response()) +response2 = Response(c, c._read_response()) +print response1.__str__() +print response2.__str__() + +request1 = RequestSelect(c, 567, 0, [1], 0, 1, 0) +request2 = RequestSelect(c, 567, 0, [2], 0, 1, 0) +s = c._socket +try: + s.send(bytes(request1) + bytes(request2)) +except OSError as e: + print ' => ', 'Failed to send request' +response1 = Response(c, c._read_response()) +response2 = Response(c, c._read_response()) +print response1.__str__() +print response2.__str__() + +c.close() + +admin("space:drop()") +