diff --git a/src/box/call.c b/src/box/call.c index a6384efe202068fcc3c81016d1f2d3978cb6f1f6..0ce84b1edea5d70b9c25a4a37d1319e6fa29223f 100644 --- a/src/box/call.c +++ b/src/box/call.c @@ -141,8 +141,6 @@ box_process_call(struct call_request *request, struct port *port) const char *name = request->name; assert(name != NULL); uint32_t name_len = mp_decode_strl(&name); - /* Transaction is not started. */ - assert(!in_txn()); int rc; struct port args; @@ -157,11 +155,6 @@ box_process_call(struct call_request *request, struct port *port) } if (rc != 0) return -1; - if (in_txn() != NULL) { - diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); - port_destroy(port); - return -1; - } return 0; } @@ -179,10 +172,5 @@ box_process_eval(struct call_request *request, struct port *port) uint32_t expr_len = mp_decode_strl(&expr); if (box_lua_eval(expr, expr_len, &args, port) != 0) return -1; - if (in_txn() != 0) { - diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); - port_destroy(port); - return -1; - } return 0; } diff --git a/src/box/errcode.h b/src/box/errcode.h index f8fda23c1210f947949616bf03ee744308fc1d52..a6f096698333af6e075c13198bd6231602d0192e 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -282,6 +282,7 @@ struct errcode_record { /*227 */_(ER_SYNC_QUEUE_UNCLAIMED, "The synchronous transaction queue doesn't belong to any instance")\ /*228 */_(ER_SYNC_QUEUE_FOREIGN, "The synchronous transaction queue belongs to other instance with id %u")\ /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \ + /*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process %s request out of stream") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 84dbdab40971f69f75dd2eac81ede3269bf59ef4..318e31e930a457606901c2ce7af3ae47400ba9b3 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -66,6 +66,7 @@ #include "tt_static.h" #include "salad/stailq.h" #include "assoc.h" +#include "txn.h" enum { IPROTO_SALT_SIZE = 32, @@ -79,6 +80,8 @@ enum { struct iproto_connection; struct iproto_stream { + /** Currently active stream transaction or NULL */ + struct txn *txn; /** * Queue of pending requests (iproto messages) for this stream, * processed sequentially. This field is accesable only from @@ -89,6 +92,11 @@ struct iproto_stream { uint64_t id; /** This stream connection */ struct iproto_connection *connection; + /** + * Pre-allocated disconnect msg to gracefully rollback stream + * transaction and destroy stream object. + */ + struct cmsg on_disconnect; }; /** @@ -135,6 +143,10 @@ struct iproto_thread { /** * Static routes for this iproto thread */ + struct cmsg_hop begin_route[2]; + struct cmsg_hop commit_route[2]; + struct cmsg_hop rollback_route[2]; + struct cmsg_hop rollback_on_disconnect_route[2]; struct cmsg_hop destroy_route[2]; struct cmsg_hop disconnect_route[2]; struct cmsg_hop misc_route[2]; @@ -641,12 +653,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id) return NULL; } errinj_stream_count_add(1); + stream->txn = NULL; stailq_create(&stream->pending_requests); stream->id = stream_id; stream->connection = connection; return stream; } +static inline void +iproto_stream_rollback_on_disconnect(struct iproto_stream *stream) +{ + struct iproto_connection *conn = stream->connection; + struct iproto_thread *iproto_thread = conn->iproto_thread; + struct cmsg_hop *route = + iproto_thread->rollback_on_disconnect_route; + cmsg_init(&stream->on_disconnect, route); + cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect); +} + /** * Return true if we have not enough spare messages * in the message pool. @@ -670,6 +694,7 @@ static void iproto_stream_delete(struct iproto_stream *stream) { assert(stailq_empty(&stream->pending_requests)); + assert(stream->txn == NULL); errinj_stream_count_add(-1); mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream); } @@ -715,7 +740,19 @@ iproto_msg_new(struct iproto_connection *con) static inline bool iproto_connection_is_idle(struct iproto_connection *con) { + /* + * The check for 'mh_size (streams) == 0' was added, because it is + * possible that when disconnect occurs, there is active transaction + * in stream after processing all messages. In this case we send + * special message to rollback it, and without this check we would + * immediately send special message to destroy connection. This would + * not lead to error now, since the messages are processed strictly + * sequentially and rollback does not yield, but it is not safely and + * if we add some more complex logic, it may lead to difficulty catching + * errors in the future. + */ return con->long_poll_count == 0 && + mh_size(con->streams) == 0 && ibuf_used(&con->ibuf[0]) == 0 && ibuf_used(&con->ibuf[1]) == 0; } @@ -805,6 +842,23 @@ iproto_connection_close(struct iproto_connection *con) * is done only once. */ con->p_ibuf->wpos -= con->parse_size; + mh_int_t node; + mh_foreach(con->streams, node) { + struct iproto_stream *stream = (struct iproto_stream *) + mh_i64ptr_node(con->streams, node)->val; + /** + * If stream requests queue is empty, it means that + * that there is some active transaction which was + * not commited yet. We need to rollback it, since + * we push on_disconnect message to tx thread here. + * If stream requests queue is not empty, it means + * that stream processing some request in tx thread + * now. We destroy stream in `net_send_msg` after + * processing all requests. + */ + if (stailq_empty(&stream->pending_requests)) + iproto_stream_rollback_on_disconnect(stream); + } cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg); assert(con->state == IPROTO_CONNECTION_ALIVE); con->state = IPROTO_CONNECTION_CLOSED; @@ -965,6 +1019,7 @@ iproto_msg_start_processing_in_stream(struct iproto_msg *msg) */ errinj_stream_msg_count_add(1); stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val; + assert(stream != NULL); msg->stream = stream; /* * If the request queue in the stream is not empty, it means @@ -1407,6 +1462,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, uint64_t stream_id; uint8_t type; bool request_is_not_for_stream; + bool request_is_only_for_stream; struct iproto_thread *iproto_thread = msg->connection->iproto_thread; if (xrow_header_decode(&msg->header, pos, reqend, true)) @@ -1418,11 +1474,19 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, request_is_not_for_stream = ((type > IPROTO_TYPE_STAT_MAX && type != IPROTO_PING) || type == IPROTO_AUTH); + request_is_only_for_stream = + (type == IPROTO_BEGIN || + type == IPROTO_COMMIT || + type == IPROTO_ROLLBACK); if (stream_id != 0 && request_is_not_for_stream) { diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM, iproto_type_name(type)); goto error; + } else if (stream_id == 0 && request_is_only_for_stream) { + diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, + iproto_type_name(type)); + goto error; } /* @@ -1450,6 +1514,15 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, sizeof(*(iproto_thread->dml_route))); cmsg_init(&msg->base, iproto_thread->dml_route[type]); break; + case IPROTO_BEGIN: + cmsg_init(&msg->base, iproto_thread->begin_route); + break; + case IPROTO_COMMIT: + cmsg_init(&msg->base, iproto_thread->commit_route); + break; + case IPROTO_ROLLBACK: + cmsg_init(&msg->base, iproto_thread->rollback_route); + break; case IPROTO_CALL_16: case IPROTO_CALL: case IPROTO_EVAL: @@ -1523,6 +1596,38 @@ tx_fiber_init(struct session *session, uint64_t sync) fiber_set_user(f, &session->credentials); } +static void +tx_process_rollback_on_disconnect(struct cmsg *m) +{ + struct iproto_stream *stream = + container_of(m, struct iproto_stream, + on_disconnect); + + if (stream->txn != NULL) { + tx_fiber_init(stream->connection->session, 0); + txn_attach(stream->txn); + if (box_txn_rollback() != 0) + panic("failed to rollback transaction on disconnect"); + stream->txn = NULL; + } +} + +static void +net_finish_rollback_on_disconnect(struct cmsg *m) +{ + struct iproto_stream *stream = + container_of(m, struct iproto_stream, + on_disconnect); + struct iproto_connection *con = stream->connection; + + struct mh_i64ptr_node_t node = { stream->id, NULL }; + mh_i64ptr_remove(con->streams, &node, 0); + iproto_stream_delete(stream); + assert(!evio_has_fd(&con->input)); + if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) + iproto_connection_try_to_start_destroy(con); +} + static void tx_process_disconnect(struct cmsg *m) { @@ -1656,15 +1761,43 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) } } +/** + * Since the processing of requests within a transaction + * for a stream can occur in different fibers, we store + * a pointer to transaction in the stream structure. + * Check if message belongs to stream and there is active + * transaction for this stream. In case it is so, sets this + * transaction for current fiber. + */ +static inline void +tx_prepare_transaction_for_request(struct iproto_msg *msg) +{ + if (msg->stream != NULL && msg->stream->txn != NULL) { + txn_attach(msg->stream->txn); + msg->stream->txn = NULL; + } + assert(!in_txn() || msg->stream != NULL); +} + static inline struct iproto_msg * tx_accept_msg(struct cmsg *m) { struct iproto_msg *msg = (struct iproto_msg *) m; tx_accept_wpos(msg->connection, &msg->wpos); tx_fiber_init(msg->connection->session, msg->header.sync); + tx_prepare_transaction_for_request(msg); return msg; } +static inline void +tx_end_msg(struct iproto_msg *msg) +{ + if (msg->stream != NULL) { + assert(msg->stream->txn == NULL); + msg->stream->txn = txn_detach(); + } +} + /** * Write error message to the output buffer and advance * write position. Doesn't throw. @@ -1690,6 +1823,7 @@ tx_reply_iproto_error(struct cmsg *m) iproto_reply_error(out, diag_last_error(&msg->diag), msg->header.sync, ::schema_version); iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); } /** Inject a short delay on tx request processing for testing. */ @@ -1702,6 +1836,72 @@ tx_inject_delay(void) }); } +static void +tx_process_begin(struct cmsg *m) +{ + struct iproto_msg *msg = tx_accept_msg(m); + struct obuf *out; + + if (tx_check_schema(msg->header.schema_version)) + goto error; + + if (box_txn_begin() != 0) + goto error; + + out = msg->connection->tx.p_obuf; + iproto_reply_ok(out, msg->header.sync, ::schema_version); + iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); + return; +error: + tx_reply_error(msg); + tx_end_msg(msg); +} + +static void +tx_process_commit(struct cmsg *m) +{ + struct iproto_msg *msg = tx_accept_msg(m); + struct obuf *out; + + if (tx_check_schema(msg->header.schema_version)) + goto error; + + if (box_txn_commit() != 0) + goto error; + + out = msg->connection->tx.p_obuf; + iproto_reply_ok(out, msg->header.sync, ::schema_version); + iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); + return; +error: + tx_reply_error(msg); + tx_end_msg(msg); +} + +static void +tx_process_rollback(struct cmsg *m) +{ + struct iproto_msg *msg = tx_accept_msg(m); + struct obuf *out; + + if (tx_check_schema(msg->header.schema_version)) + goto error; + + if (box_txn_rollback() != 0) + goto error; + + out = msg->connection->tx.p_obuf; + iproto_reply_ok(out, msg->header.sync, ::schema_version); + iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); + return; +error: + tx_reply_error(msg); + tx_end_msg(msg); +} + static void tx_process1(struct cmsg *m) { @@ -1723,9 +1923,11 @@ tx_process1(struct cmsg *m) iproto_reply_select(out, &svp, msg->header.sync, ::schema_version, tuple != 0); iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); return; error: tx_reply_error(msg); + tx_end_msg(msg); } static void @@ -1766,9 +1968,11 @@ tx_process_select(struct cmsg *m) iproto_reply_select(out, &svp, msg->header.sync, ::schema_version, count); iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); return; error: tx_reply_error(msg); + tx_end_msg(msg); } static int @@ -1818,6 +2022,12 @@ tx_process_call(struct cmsg *m) if (rc != 0) goto error; + if (in_txn() != NULL && msg->header.stream_id == 0) { + diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); + port_destroy(&port); + goto error; + } + /* * Add all elements returned by the function to iproto. * @@ -1856,9 +2066,11 @@ tx_process_call(struct cmsg *m) iproto_reply_select(out, &svp, msg->header.sync, ::schema_version, count); iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); return; error: tx_reply_error(msg); + tx_end_msg(msg); } static void @@ -1867,6 +2079,7 @@ tx_process_misc(struct cmsg *m) struct iproto_msg *msg = tx_accept_msg(m); struct iproto_connection *con = msg->connection; struct obuf *out = con->tx.p_obuf; + assert(!(msg->header.type != IPROTO_PING && in_txn())); if (tx_check_schema(msg->header.schema_version)) goto error; @@ -1899,9 +2112,11 @@ tx_process_misc(struct cmsg *m) } catch (Exception *e) { tx_reply_error(msg); } + tx_end_msg(msg); return; error: tx_reply_error(msg); + tx_end_msg(msg); } static void @@ -1995,9 +2210,11 @@ tx_process_sql(struct cmsg *m) port_destroy(&port); iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version); iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); return; error: tx_reply_error(msg); + tx_end_msg(msg); } static void @@ -2007,6 +2224,7 @@ tx_process_replication(struct cmsg *m) struct iproto_connection *con = msg->connection; struct ev_io io; coio_create(&io, con->input.fd); + assert(!in_txn()); try { switch (msg->header.type) { case IPROTO_JOIN: @@ -2066,9 +2284,24 @@ iproto_msg_finish_processing_in_stream(struct iproto_msg *msg) errinj_stream_msg_count_add(-1); if (stailq_empty(&stream->pending_requests)) { - struct mh_i64ptr_node_t node = { stream->id, NULL }; - mh_i64ptr_remove(con->streams, &node, 0); - iproto_stream_delete(stream); + /* + * If no more messages for the current stream + * and no transaction started, then delete it. + */ + if (stream->txn == NULL) { + struct mh_i64ptr_node_t node = { stream->id, NULL }; + mh_i64ptr_remove(con->streams, &node, 0); + iproto_stream_delete(stream); + } else if (!evio_has_fd(&con->input)) { + /* + * Here we are in case when connection was closed, + * there is no messages in stream queue, but there + * is some active transaction in stream. + * Send disconnect message to rollback this + * transaction. + */ + iproto_stream_rollback_on_disconnect(stream); + } } else { /* * If there are new messages for this stream @@ -2404,6 +2637,23 @@ iproto_session_push(struct session *session, struct port *port) static inline void iproto_thread_init_routes(struct iproto_thread *iproto_thread) { + iproto_thread->begin_route[0] = + { tx_process_begin, &iproto_thread->net_pipe }; + iproto_thread->begin_route[1] = + { net_send_msg, NULL }; + iproto_thread->commit_route[0] = + { tx_process_commit, &iproto_thread->net_pipe }; + iproto_thread->commit_route[1] = + { net_send_msg, NULL }; + iproto_thread->rollback_route[0] = + { tx_process_rollback, &iproto_thread->net_pipe }; + iproto_thread->rollback_route[1] = + { net_send_msg, NULL }; + iproto_thread->rollback_on_disconnect_route[0] = + { tx_process_rollback_on_disconnect, + &iproto_thread->net_pipe }; + iproto_thread->rollback_on_disconnect_route[1] = + { net_finish_rollback_on_disconnect, NULL }; iproto_thread->destroy_route[0] = { tx_process_destroy, &iproto_thread->net_pipe }; iproto_thread->destroy_route[1] = diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index f2902946a43bae8166ab9166e3efe42e03f23bfa..913a64de52c771e8ae5312d0e148084ed53390ab 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -166,6 +166,9 @@ const char *iproto_type_strs[] = "EXECUTE", NULL, /* NOP */ "PREPARE", + "BEGIN", + "COMMIT", + "ROLLBACK", }; #define bit(c) (1ULL<<IPROTO_##c) @@ -184,6 +187,9 @@ const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = { 0, /* EXECUTE */ 0, /* NOP */ 0, /* PREPARE */ + 0, /* BEGIN */ + 0, /* COMMIT */ + 0, /* ROLLBACK */ }; #undef bit diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 8792737b2686624fd0aeb06c32fce436b87099cb..9210075808b7920cafff55ffc444b499d9adc534 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -237,6 +237,12 @@ enum iproto_type { IPROTO_NOP = 12, /** Prepare SQL statement. */ IPROTO_PREPARE = 13, + /* Begin transaction */ + IPROTO_BEGIN = 14, + /* Commit transaction */ + IPROTO_COMMIT = 15, + /* Rollback transaction */ + IPROTO_ROLLBACK = 16, /** The maximum typecode used for box.stat() */ IPROTO_TYPE_STAT_MAX, diff --git a/src/box/txn.c b/src/box/txn.c index e057d2762176d4a576ea57172cb2f8b5812b7a42..06d048870ecad1172cb6e729e866ff3f9627d4dc 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -1255,3 +1255,26 @@ txn_on_yield(struct trigger *trigger, void *event) txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD); return 0; } + +struct txn * +txn_detach(void) +{ + struct txn *txn = in_txn(); + if (txn == NULL) + return NULL; + if (!txn_has_flag(txn, TXN_CAN_YIELD)) { + txn_on_yield(NULL, NULL); + trigger_clear(&txn->fiber_on_yield); + } + trigger_clear(&txn->fiber_on_stop); + fiber_set_txn(fiber(), NULL); + return txn; +} + +void +txn_attach(struct txn *txn) +{ + assert(txn != NULL); + assert(!in_txn()); + fiber_set_txn(fiber(), txn); +} diff --git a/src/box/txn.h b/src/box/txn.h index 8741dc6a152002830d902e0cabebdcee93ebc580..f111445671a791bd4b5325e6a555d93af8d00a1b 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -457,6 +457,25 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn) fiber->storage.txn = txn; } +/** + * Detach transaction from fiber. + * By default if the fiber is stopped the transaction started + * in this fiber is rollback. This function detaches transaction + * from fiber - detached transaction does not rollback in case + * when fiber stopped, but can be aborted in case it does not + * support yeild. + */ +struct txn * +txn_detach(void); + +/** + * Attach transaction to fiber. + * Attach @a txn that has been detached previously and saved + * somewhere to a new fiber. + */ +void +txn_attach(struct txn *txn); + /** * Start a transaction explicitly. * @pre no transaction is active diff --git a/test/box-tap/feedback_daemon.test.lua b/test/box-tap/feedback_daemon.test.lua index a2e04164989b0f39a62b6b599da0992dbe93d01b..f700f3f72ac25c51af849f541505f783dfc28be3 100755 --- a/test/box-tap/feedback_daemon.test.lua +++ b/test/box-tap/feedback_daemon.test.lua @@ -251,7 +251,7 @@ box.space.features_sync:drop() local function check_stats(stat) local sub = test:test('feedback operation stats') - sub:plan(18) + sub:plan(21) local box_stat = box.stat() local net_stat = box.stat.net() for op, val in pairs(box_stat) do diff --git a/test/box/error.result b/test/box/error.result index f80fdfed59d09e2d1b357a7f93117ccfdf90bfcd..bc804197a574f99d246249d5f7de47b158491b88 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -448,6 +448,7 @@ t; | 227: box.error.SYNC_QUEUE_UNCLAIMED | 228: box.error.SYNC_QUEUE_FOREIGN | 229: box.error.UNABLE_TO_PROCESS_IN_STREAM + | 230: box.error.UNABLE_TO_PROCESS_OUT_OF_STREAM | ... test_run:cmd("setopt delimiter ''"); diff --git a/test/box/misc.result b/test/box/misc.result index b62a64355bbb793fc1dc2d2c12584021a68b9ce5..c86245914b80c430b6a7dddc5293fd611bbd0158 100644 --- a/test/box/misc.result +++ b/test/box/misc.result @@ -136,11 +136,14 @@ end; t; --- - - DELETE + - COMMIT - SELECT + - ROLLBACK - INSERT - EVAL - - CALL - ERROR + - CALL + - BEGIN - PREPARE - REPLACE - UPSERT