From 9a8559d871e4b4a820c29113c848bf52d956b5ec Mon Sep 17 00:00:00 2001 From: Denis Smirnov <sd@picodata.io> Date: Fri, 19 Jul 2024 14:27:02 +0700 Subject: [PATCH] feat: avoid schema version bump for temporary spaces Temporary spaces, used for cluster-wide SQL data materialization, were causing unnecessary netbox schema version bumps, leading to schema downloading via netbox, excessive Lua garbage and GC blocks. Since these tables are for internal SQL use, we don't need to inform netbox clients about schema changes. We now maintain separate schema versions: one for netbox clients and one for the internal prepared statement cache. NO_DOC=picodata internal patch NO_CHANGELOG=picodata internal patch --- src/box/alter.cc | 29 ++++++----- src/box/execute.c | 2 +- src/box/iproto.cc | 51 ++++++++++--------- src/box/lua/info.c | 8 +++ src/box/schema.cc | 33 +++++++++++- src/box/schema.h | 26 +++++++++- src/box/sql/vdbe.c | 2 +- src/box/sql/vdbeaux.c | 2 +- .../fully-temporary_spaces_test.lua | 35 +++++++++++++ test/box/info.result | 1 + 10 files changed, 147 insertions(+), 42 deletions(-) diff --git a/src/box/alter.cc b/src/box/alter.cc index ce7060c6c8..e47ead8cd3 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -63,13 +63,6 @@ /* {{{ Auxiliary functions and methods. */ -static void -box_schema_version_bump(void) -{ - ++schema_version; - box_broadcast_schema(); -} - int access_check_ddl(const char *name, uint32_t object_id, uint32_t owner_uid, enum box_schema_object_type type, @@ -1584,6 +1577,14 @@ TruncateIndex::~TruncateIndex() index_abort_create(new_index); } +static inline void +schema_version_bump(struct space *space) +{ + if (!space_is_temporary(space)) + box_bump_schema_version(); + stmt_cache_bump_schema_version(); +} + /** * UpdateSchemaVersion - increment schema_version. Used on * in alter_space_do(), i.e. when creating or dropping @@ -1600,8 +1601,7 @@ class UpdateSchemaVersion: public AlterSpaceOp void UpdateSchemaVersion::alter(struct alter_space *alter) { - (void)alter; - box_schema_version_bump(); + schema_version_bump(alter->old_space); } /** @@ -2185,7 +2185,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) * AlterSpaceOps are registered in case of space * create. */ - box_schema_version_bump(); + schema_version_bump(space); /* * So may happen that until the DDL change record * is written to the WAL, the space is used for @@ -2279,7 +2279,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) * deleting the space from the space_cache, since no * AlterSpaceOps are registered in case of space drop. */ - box_schema_version_bump(); + schema_version_bump(old_space); struct trigger *on_commit = txn_alter_trigger_new(on_drop_space_commit, old_space); if (on_commit == NULL) @@ -5002,7 +5002,12 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event) txn_stmt_on_rollback(stmt, on_rollback); txn_stmt_on_commit(stmt, on_commit); - box_schema_version_bump(); + /* + * The triggers are not supported for temporary spaces, + * so we always bump box schema version. + */ + box_bump_schema_version(); + stmt_cache_bump_schema_version(); return 0; } diff --git a/src/box/execute.c b/src/box/execute.c index ff544132f0..9e221bc598 100644 --- a/src/box/execute.c +++ b/src/box/execute.c @@ -85,7 +85,7 @@ sql_row_to_port(struct sql_stmt *stmt, struct region *region, struct port *port) static bool sql_stmt_schema_version_is_valid(struct sql_stmt *stmt) { - return sql_stmt_schema_version(stmt) == box_schema_version(); + return sql_stmt_schema_version(stmt) == stmt_cache_schema_version(); } /** diff --git a/src/box/iproto.cc b/src/box/iproto.cc index c9c88da2f8..7a6cde0a38 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -58,7 +58,7 @@ #include "tuple_convert.h" #include "session.h" #include "xrow.h" -#include "schema.h" /* schema_version */ +#include "schema.h" #include "replication.h" /* instance_uuid */ #include "iproto_constants.h" #include "iproto_features.h" @@ -1277,7 +1277,7 @@ iproto_connection_resume(struct iproto_connection *con) if (iproto_enqueue_batch(con, con->p_ibuf) != 0) { struct error *e = box_error_last(); error_log(e); - iproto_write_error(&con->io, e, ::schema_version, 0); + iproto_write_error(&con->io, e, box_schema_version(), 0); iproto_connection_close(con); } } @@ -1366,7 +1366,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, } catch (Exception *e) { e->log(); /* Best effort at sending the error message to the client. */ - iproto_write_error(io, e, ::schema_version, 0); + iproto_write_error(io, e, box_schema_version(), 0); iproto_connection_close(con); } } @@ -2020,9 +2020,10 @@ static int tx_check_msg(struct iproto_msg *msg) { uint64_t new_schema_version = msg->header.schema_version; - if (new_schema_version != 0 && new_schema_version != schema_version) { + if (new_schema_version != 0 && + new_schema_version != box_schema_version()) { diag_set(ClientError, ER_WRONG_SCHEMA_VERSION, - new_schema_version, schema_version); + new_schema_version, box_schema_version()); return -1; } enum iproto_type type = (enum iproto_type)msg->header.type; @@ -2058,7 +2059,7 @@ tx_reply_error(struct iproto_msg *msg) { struct obuf *out = msg->connection->tx.p_obuf; iproto_reply_error(out, diag_last_error(&fiber()->diag), - msg->header.sync, ::schema_version); + msg->header.sync, box_schema_version()); iproto_wpos_create(&msg->wpos, out); } @@ -2073,7 +2074,7 @@ tx_reply_iproto_error(struct cmsg *m) struct obuf *out = msg->connection->tx.p_obuf; struct obuf_svp header = obuf_create_svp(out); iproto_reply_error(out, diag_last_error(&msg->diag), - msg->header.sync, ::schema_version); + msg->header.sync, box_schema_version()); iproto_wpos_create(&msg->wpos, out); tx_end_msg(msg, &header); } @@ -2117,7 +2118,7 @@ tx_process_begin(struct cmsg *m) } out = msg->connection->tx.p_obuf; header = obuf_create_svp(out); - iproto_reply_ok(out, msg->header.sync, ::schema_version); + iproto_reply_ok(out, msg->header.sync, box_schema_version()); iproto_wpos_create(&msg->wpos, out); tx_end_msg(msg, &header); return; @@ -2143,7 +2144,7 @@ tx_process_commit(struct cmsg *m) out = msg->connection->tx.p_obuf; header = obuf_create_svp(out); - iproto_reply_ok(out, msg->header.sync, ::schema_version); + iproto_reply_ok(out, msg->header.sync, box_schema_version()); iproto_wpos_create(&msg->wpos, out); tx_end_msg(msg, &header); return; @@ -2169,7 +2170,7 @@ tx_process_rollback(struct cmsg *m) out = msg->connection->tx.p_obuf; header = obuf_create_svp(out); - iproto_reply_ok(out, msg->header.sync, ::schema_version); + iproto_reply_ok(out, msg->header.sync, box_schema_version()); iproto_wpos_create(&msg->wpos, out); tx_end_msg(msg, &header); return; @@ -2198,7 +2199,7 @@ tx_process1(struct cmsg *m) goto error; if (tuple && tuple_to_obuf(tuple, out)) goto error; - iproto_reply_select(out, &svp, msg->header.sync, ::schema_version, + iproto_reply_select(out, &svp, msg->header.sync, box_schema_version(), tuple != 0); iproto_wpos_create(&msg->wpos, out); tx_end_msg(msg, &svp); @@ -2268,13 +2269,14 @@ tx_process_select(struct cmsg *m) assert(packed_pos != NULL); if (iproto_reply_select_with_position(out, &svp, msg->header.sync, - ::schema_version, count, + box_schema_version(), + count, packed_pos, packed_pos_end) != 0) goto discard; } else { iproto_reply_select(out, &svp, msg->header.sync, - ::schema_version, count); + box_schema_version(), count); } region_truncate(&fiber()->gc, region_svp); iproto_wpos_create(&msg->wpos, out); @@ -2390,7 +2392,7 @@ tx_process_call(struct cmsg *m) } iproto_reply_select(out, &svp, msg->header.sync, - ::schema_version, count); + box_schema_version(), count); iproto_wpos_create(&msg->wpos, out); tx_end_msg(msg, &svp); return; @@ -2432,26 +2434,26 @@ tx_process_misc(struct cmsg *m) box_process_auth(&msg->auth, con->salt, IPROTO_SALT_SIZE); iproto_reply_ok_xc(out, msg->header.sync, - ::schema_version); + box_schema_version()); break; case IPROTO_PING: iproto_reply_ok_xc(out, msg->header.sync, - ::schema_version); + box_schema_version()); break; case IPROTO_ID: tx_process_id(con, &msg->id); iproto_reply_id_xc(out, box_auth_type, msg->header.sync, - ::schema_version); + box_schema_version()); break; case IPROTO_VOTE_DEPRECATED: iproto_reply_vclock_xc(out, &replicaset.vclock, msg->header.sync, - ::schema_version); + box_schema_version()); break; case IPROTO_VOTE: box_process_vote(&ballot); iproto_reply_vote_xc(out, &ballot, msg->header.sync, - ::schema_version); + box_schema_version()); break; case IPROTO_WATCH: session_watch(con->session, msg->header.sync, @@ -2558,7 +2560,8 @@ tx_process_sql(struct cmsg *m) struct obuf_svp header_svp; if (is_unprepare) { header_svp = obuf_create_svp(out); - if (iproto_reply_ok(out, msg->header.sync, schema_version) != 0) + if (iproto_reply_ok(out, msg->header.sync, + box_schema_version()) != 0) goto error; iproto_wpos_create(&msg->wpos, out); tx_end_msg(msg, &header_svp); @@ -2575,7 +2578,8 @@ tx_process_sql(struct cmsg *m) goto error; } port_destroy(&port); - iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version); + iproto_reply_sql(out, &header_svp, msg->header.sync, + box_schema_version()); iproto_wpos_create(&msg->wpos, out); tx_end_msg(msg, &header_svp); return; @@ -2631,7 +2635,8 @@ tx_process_replication(struct cmsg *m) * written row. Do not push it on top. */ } catch (Exception *e) { - iproto_write_error(io, e, ::schema_version, msg->header.sync); + iproto_write_error(io, e, box_schema_version(), + msg->header.sync); } struct obuf_svp empty = obuf_create_svp(msg->connection->tx.p_obuf); tx_end_msg(msg, &empty); @@ -3065,7 +3070,7 @@ iproto_session_push(struct session *session, struct port *port) return -1; } iproto_reply_chunk(con->tx.p_obuf, &svp, iproto_session_sync(session), - ::schema_version); + box_schema_version()); tx_push(con, &svp); return 0; } diff --git a/src/box/lua/info.c b/src/box/lua/info.c index 7bb9f46b00..3b802fc3ff 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -694,6 +694,13 @@ lbox_schema_version(struct lua_State *L) return 1; } +static int +lbox_stmt_cache_version(struct lua_State *L) +{ + luaL_pushuint64(L, stmt_cache_schema_version()); + return 1; +} + static const struct luaL_Reg lbox_info_dynamic_meta[] = { {"id", lbox_info_id}, {"uuid", lbox_info_uuid}, @@ -716,6 +723,7 @@ static const struct luaL_Reg lbox_info_dynamic_meta[] = { {"election", lbox_info_election}, {"synchro", lbox_info_synchro}, {"schema_version", lbox_schema_version}, + {"statement_version", lbox_stmt_cache_version}, {NULL, NULL} }; diff --git a/src/box/schema.cc b/src/box/schema.cc index 0f57bab7d5..68c28caa4e 100644 --- a/src/box/schema.cc +++ b/src/box/schema.cc @@ -29,6 +29,7 @@ * SUCH DAMAGE. */ #include "schema.h" +#include "box.h" #include "sequence.h" #include "assoc.h" #include "alter.h" @@ -57,9 +58,18 @@ */ static struct mh_i32ptr_t *sequences; + +/** Schema version. */ +struct version { + /** Version of the schema used for iproto operations via netbox. */ + uint64_t box; + /** Version of the schema used for statement cache invalidation. */ + uint64_t stmt_cache; +}; + /** Public change counter. On its update clients need to fetch * new space data from the instance. */ -uint64_t schema_version = 0; +struct version schema_version = {0, 0}; /** Persistent version of the schema, stored in _schema["version"]. */ uint32_t dd_version_id = 0; @@ -77,7 +87,26 @@ struct entity_access entity_access; API_EXPORT uint64_t box_schema_version(void) { - return schema_version; + return schema_version.box; +} + +void +box_bump_schema_version(void) +{ + schema_version.box++; + box_broadcast_schema(); +} + +uint64_t +stmt_cache_schema_version(void) +{ + return schema_version.stmt_cache; +} + +void +stmt_cache_bump_schema_version(void) +{ + schema_version.stmt_cache++; } uint32_t diff --git a/src/box/schema.h b/src/box/schema.h index 64bf799ccf..0e5ebf8bde 100644 --- a/src/box/schema.h +++ b/src/box/schema.h @@ -46,7 +46,7 @@ struct func; /** * See `box_schema_version`. */ -extern uint64_t schema_version; +extern struct version schema_version; extern uint32_t dd_version_id; /** Triggers invoked after schema initialization. */ @@ -66,11 +66,33 @@ dd_check_is_disabled(void); /** * Returns the current version of the database schema, an unsigned number * that goes up when there is a major change in the schema, i.e., on DDL - * operations (\sa IPROTO_SCHEMA_VERSION). + * operations that should be propagated to the netbox clients via IPROTO + * (\sa IPROTO_SCHEMA_VERSION). */ API_EXPORT uint64_t box_schema_version(void); +/** + * Bump the schema version for the netbox clients. + */ +void +box_bump_schema_version(void); + +/** + * Returns the current version of the database schema, an unsigned number + * that goes up on every schema change. Current schema version is used to + * invalidate prepared statements in the statement cache if there was a + * schema change. + */ +uint64_t +stmt_cache_schema_version(void); + +/** + * Bump the schema version for the statement cache. + */ +void +stmt_cache_bump_schema_version(void); + /** \endcond public */ /** Return current persistent schema version. */ diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c index a6561a3792..746543f0c6 100644 --- a/src/box/sql/vdbe.c +++ b/src/box/sql/vdbe.c @@ -2379,7 +2379,7 @@ case OP_TTransaction: { */ case OP_IteratorOpen: { struct VdbeCursor *cur = p->apCsr[pOp->p1]; - if (box_schema_version() != p->schema_ver && + if (stmt_cache_schema_version() != p->schema_ver && (pOp->p5 & OPFLAG_SYSTEMSP) == 0) { p->expired = 1; diag_set(ClientError, ER_SQL_EXECUTE, "schema version has "\ diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c index ccc98ea038..1bb9576814 100644 --- a/src/box/sql/vdbeaux.c +++ b/src/box/sql/vdbeaux.c @@ -67,7 +67,7 @@ sqlVdbeCreate(Parse * pParse) db->pVdbe = p; p->magic = VDBE_MAGIC_INIT; p->pParse = pParse; - p->schema_ver = box_schema_version(); + p->schema_ver = stmt_cache_schema_version(); assert(pParse->aLabel == 0); assert(pParse->nLabel == 0); assert(pParse->nOpAlloc == 0); diff --git a/test/box-luatest/fully-temporary_spaces_test.lua b/test/box-luatest/fully-temporary_spaces_test.lua index d1e1b1091d..3da6cb9c27 100644 --- a/test/box-luatest/fully-temporary_spaces_test.lua +++ b/test/box-luatest/fully-temporary_spaces_test.lua @@ -155,6 +155,41 @@ g.test_temporary_create = function() end) end +-- check that temporary spaces don't update box schema version +-- but still update the statement one. +g.test_schema_version = function() + g.server:exec(function() + local box_version = box.info.schema_version + local stmt_cache_version = box.info.statement_version + local op = 0 + local s = box.schema.space.create('temp', { type = 'temporary' }) + op = op + 1 + t.assert_equals(box.info.schema_version, box_version) + t.assert_equals(box.info.statement_version, stmt_cache_version + op) + + s:format {{'a', 'unsigned'}} + op = op + 1 + t.assert_equals(box.info.schema_version, box_version) + t.assert_equals(box.info.statement_version, stmt_cache_version + op) + + s:create_index('pk') + op = op + 1 + t.assert_equals(box.info.schema_version, box_version) + t.assert_equals(box.info.statement_version, stmt_cache_version + op) + + s:truncate() + -- truncate doesn't update statement version + t.assert_equals(box.info.schema_version, box_version) + t.assert_equals(box.info.statement_version, stmt_cache_version + op) + + s:drop() + -- drop the space with its primary key index + op = op + 2 + t.assert_equals(box.info.schema_version, box_version) + t.assert_equals(box.info.statement_version, stmt_cache_version + op) + end) +end + -- check which features aren't supported for temporary spaces g.test_temporary_dont_support = function() g.server:exec(function() diff --git a/test/box/info.result b/test/box/info.result index a7444cad39..0c7ad5bf4a 100644 --- a/test/box/info.result +++ b/test/box/info.result @@ -89,6 +89,7 @@ t - schema_version - signature - sql + - statement_version - status - synchro - uptime -- GitLab