diff --git a/src/box/box.cc b/src/box/box.cc index 67042f80506a2a2569392992e1f50a554650c8c5..acda44c04dcb418dff4a8d25f0d0c7bbd5da147e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -231,10 +231,22 @@ wal_stream_create(struct wal_stream *ctx, size_t rows_per_wal) static void apply_join_row(struct xstream *stream, struct xrow_header *row) { - /* TODO: a temporary workaround for broken sophia JOIN #1134 */ - row->lsn = vclock_get(&recovery->vclock, 0) + 1; - row->server_id = 0; - apply_row(stream, row); + if (row->type != IPROTO_INSERT) { + tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, + (uint32_t) row->type); + } + + (void) stream; + struct request *request; + request = region_alloc_object_xc(&fiber()->gc, struct request); + request_create(request, row->type); + assert(row->bodycnt == 1); /* always 1 for read */ + request_decode(request, (const char *) row->body[0].iov_base, + row->body[0].iov_len); + request->header = row; + struct space *space = space_cache_find(request->space_id); + /* no access checks here - applier always works with admin privs */ + space->handler->applySnapshotRow(space, request); } static void diff --git a/src/box/engine.cc b/src/box/engine.cc index 02504e8d8bb3f1133c1f2ad3e67aed435d7c74a0..de4ce16e424d4a3c70812f0bc52ba1ac6d777c85 100644 --- a/src/box/engine.cc +++ b/src/box/engine.cc @@ -157,6 +157,13 @@ Handler::Handler(Engine *f) { } +void +Handler::applySnapshotRow(struct space *, struct request *) +{ + tnt_raise(ClientError, ER_UNSUPPORTED, engine->name, + "applySnapshotRow"); +} + struct tuple * Handler::executeReplace(struct txn *, struct space *, struct request *) diff --git a/src/box/engine.h b/src/box/engine.h index da578c3642147e8b805d653fde173e9d2cc78728..0e34d0b6369f8c69fe90649a4259299d9e6718ea 100644 --- a/src/box/engine.h +++ b/src/box/engine.h @@ -44,10 +44,6 @@ enum engine_flags { extern struct rlist engines; -typedef void -(*engine_replace_f)(struct txn *txn, struct space *, - struct tuple *, struct tuple *, enum dup_replace_mode); - class Handler; /** Engine instance */ @@ -181,6 +177,8 @@ class Handler { Handler(const Handler &) = delete; Handler& operator=(const Handler&) = delete; + virtual void + applySnapshotRow(struct space *space, struct request *); virtual struct tuple * executeReplace(struct txn *, struct space *, struct request *); diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index 474ca6b663025053d75759a3c3c728641b843411..0dd8bc141b0d6f6fd8c3b447e1aa1b8b72f131a9 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -92,8 +92,8 @@ enum { * A version of space_replace for a space which has * no indexes (is not yet fully built). */ -static void -memtx_replace_no_keys(struct txn * /* txn */, struct space *space, +static struct tuple * +memtx_replace_no_keys(struct space *space, struct tuple * /* old_tuple */, struct tuple * /* new_tuple */, enum dup_replace_mode /* mode */) @@ -101,8 +101,14 @@ memtx_replace_no_keys(struct txn * /* txn */, struct space *space, Index *index = index_find(space, 0); assert(index == NULL); /* not reached. */ (void) index; + return NULL; } +typedef struct tuple * +(*engine_replace_f)(struct space *, struct tuple *, struct tuple *, + enum dup_replace_mode); + + struct MemtxSpace: public Handler { MemtxSpace(Engine *e) : Handler(e) @@ -114,6 +120,8 @@ struct MemtxSpace: public Handler { /* do nothing */ /* engine->close(this); */ } + virtual void + applySnapshotRow(struct space *space, struct request *request); virtual struct tuple * executeReplace(struct txn *txn, struct space *space, struct request *request); @@ -227,6 +235,57 @@ dup_replace_mode(uint32_t op) return op == IPROTO_INSERT ? DUP_INSERT : DUP_REPLACE_OR_INSERT; } +/** + * Do the plumbing necessary for correct statement-level + * and transaction rollback. + */ +static inline void +memtx_txn_add_undo(struct txn *txn, struct tuple *old_tuple, + struct tuple *new_tuple) +{ + /* + * Remember the old tuple only if we replaced it + * successfully, to not remove a tuple inserted by + * another transaction in rollback(). + */ + struct txn_stmt *stmt = txn_current_stmt(txn); + assert(stmt->space); + stmt->old_tuple = old_tuple; + stmt->new_tuple = new_tuple; +} + +void +MemtxSpace::applySnapshotRow(struct space *space, struct request *request) +{ + assert(request->type == IPROTO_INSERT); + struct tuple *new_tuple = tuple_new(space->format, request->tuple, + request->tuple_end); + /* GC the new tuple if there is an exception below. */ + TupleRef ref(new_tuple); + space_validate_tuple(space, new_tuple); + if (!rlist_empty(&space->on_replace)) { + /* + * Emulate transactions for system spaces with triggers + */ + assert(in_txn() == NULL); + request->header->server_id = 0; + struct txn *txn = txn_begin_stmt(space); + try { + struct tuple *old_tuple = this->replace(space, NULL, + new_tuple, DUP_INSERT); + memtx_txn_add_undo(txn, old_tuple, new_tuple); + txn_commit_stmt(txn, request); + } catch (Exception *e) { + say_error("rollback: %s", e->errmsg); + txn_rollback_stmt(); + throw; + } + return; + } + this->replace(space, NULL, new_tuple, DUP_INSERT); + /** The new tuple is referenced by the primary key. */ +} + struct tuple * MemtxSpace::executeReplace(struct txn *txn, struct space *space, struct request *request) @@ -237,7 +296,8 @@ MemtxSpace::executeReplace(struct txn *txn, struct space *space, TupleRef ref(new_tuple); space_validate_tuple(space, new_tuple); enum dup_replace_mode mode = dup_replace_mode(request->type); - this->replace(txn, space, NULL, new_tuple, mode); + struct tuple *old_tuple = this->replace(space, NULL, new_tuple, mode); + memtx_txn_add_undo(txn, old_tuple, new_tuple); /** The new tuple is referenced by the primary key. */ return new_tuple; } @@ -255,7 +315,8 @@ MemtxSpace::executeDelete(struct txn *txn, struct space *space, if (old_tuple == NULL) return NULL; - this->replace(txn, space, old_tuple, NULL, DUP_REPLACE_OR_INSERT); + this->replace(space, old_tuple, NULL, DUP_REPLACE_OR_INSERT); + memtx_txn_add_undo(txn, old_tuple, NULL); return old_tuple; } @@ -282,7 +343,8 @@ MemtxSpace::executeUpdate(struct txn *txn, struct space *space, request->index_base); TupleRef ref(new_tuple); space_validate_tuple(space, new_tuple); - this->replace(txn, space, old_tuple, new_tuple, DUP_REPLACE); + this->replace(space, old_tuple, new_tuple, DUP_REPLACE); + memtx_txn_add_undo(txn, old_tuple, new_tuple); return new_tuple; } @@ -334,7 +396,8 @@ MemtxSpace::executeUpsert(struct txn *txn, struct space *space, request->tuple, request->tuple_end); TupleRef ref(new_tuple); /* useless, for unified approach */ - replace(txn, space, NULL, new_tuple, DUP_INSERT); + old_tuple = replace(space, NULL, new_tuple, DUP_INSERT); + memtx_txn_add_undo(txn, old_tuple, new_tuple); } else { /** * Update the tuple. @@ -354,7 +417,8 @@ MemtxSpace::executeUpsert(struct txn *txn, struct space *space, */ try { space_validate_tuple(space, new_tuple); - replace(txn, space, old_tuple, new_tuple, DUP_REPLACE); + replace(space, old_tuple, new_tuple, DUP_REPLACE); + memtx_txn_add_undo(txn, old_tuple, new_tuple); } catch (ClientError *e) { say_error("UPSERT failed:"); e->log(); @@ -410,31 +474,12 @@ txn_on_yield_or_stop(struct trigger * /* trigger */, void * /* event */) txn_rollback(); /* doesn't throw */ } -/** - * Do the plumbing necessary for correct statement-level - * and transaction rollback. - */ -static void -memtx_txn_add_undo(struct txn *txn, struct tuple *old_tuple, - struct tuple *new_tuple) -{ - /* - * Remember the old tuple only if we replaced it - * successfully, to not remove a tuple inserted by - * another transaction in rollback(). - */ - struct txn_stmt *stmt = txn_current_stmt(txn); - assert(stmt->space); - stmt->old_tuple = old_tuple; - stmt->new_tuple = new_tuple; -} - /** * A short-cut version of replace() used during bulk load * from snapshot. */ -void -memtx_replace_build_next(struct txn * /* txn */, struct space *space, +static struct tuple * +memtx_replace_build_next(struct space *space, struct tuple *old_tuple, struct tuple *new_tuple, enum dup_replace_mode mode) { @@ -452,27 +497,26 @@ memtx_replace_build_next(struct txn * /* txn */, struct space *space, } ((MemtxIndex *) space->index[0])->buildNext(new_tuple); tuple_ref(new_tuple); + return NULL; } /** * A short-cut version of replace() used when loading * data from XLOG files. */ -void -memtx_replace_primary_key(struct txn *txn, struct space *space, - struct tuple *old_tuple, struct tuple *new_tuple, - enum dup_replace_mode mode) +static struct tuple * +memtx_replace_primary_key(struct space *space, struct tuple *old_tuple, + struct tuple *new_tuple, enum dup_replace_mode mode) { old_tuple = space->index[0]->replace(old_tuple, new_tuple, mode); if (new_tuple) tuple_ref(new_tuple); - memtx_txn_add_undo(txn, old_tuple, new_tuple); + return old_tuple; } -static void -memtx_replace_all_keys(struct txn *txn, struct space *space, - struct tuple *old_tuple, struct tuple *new_tuple, - enum dup_replace_mode mode) +static struct tuple * +memtx_replace_all_keys(struct space *space, struct tuple *old_tuple, + struct tuple *new_tuple, enum dup_replace_mode mode) { /* * Ensure we have enough slack memory to guarantee @@ -509,7 +553,7 @@ memtx_replace_all_keys(struct txn *txn, struct space *space, } if (new_tuple) tuple_ref(new_tuple); - memtx_txn_add_undo(txn, old_tuple, new_tuple); + return old_tuple; } static void @@ -610,14 +654,8 @@ MemtxEngine::recoverSnapshotRow(struct xrow_header *row) /* memtx snapshot must contain only memtx spaces */ if (space->handler->engine != this) tnt_raise(ClientError, ER_CROSS_ENGINE_TRANSACTION); - struct txn *txn = txn_begin_stmt(space); - try { - space->handler->executeReplace(txn, space, request); - txn_commit_stmt(txn, request); - } catch (Exception *e) { - txn_rollback_stmt(); - throw; - } + /* no access checks here - applier always works with admin privs */ + space->handler->applySnapshotRow(space, request); } /** Called at start to tell memtx to recover to a given LSN. */ diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc index 194f08e8813ce328f9a827a1c3e21f4bf0202f2a..709798ccb928e9e154d5fa64db9f1906ee27832f 100644 --- a/src/box/sophia_engine.cc +++ b/src/box/sophia_engine.cc @@ -187,6 +187,8 @@ sophia_read(void *dest, void *key) struct SophiaSpace: public Handler { SophiaSpace(Engine*); + virtual void + applySnapshotRow(struct space *space, struct request *request); virtual struct tuple * executeReplace(struct txn*, struct space *space, struct request *request); @@ -201,6 +203,58 @@ struct SophiaSpace: public Handler { struct request *request); }; +void +SophiaSpace::applySnapshotRow(struct space *space, struct request *request) +{ + assert(request->type == IPROTO_INSERT); + SophiaIndex *index = (SophiaIndex *)index_find(space, 0); + + space_validate_tuple_raw(space, request->tuple); + int size = request->tuple_end - request->tuple; + const char *key = tuple_field_raw(request->tuple, size, + index->key_def->parts[0].fieldno); + primary_key_validate(index->key_def, key, index->key_def->part_count); + + const char *value; + void *obj = index->createDocument(key, &value); + size_t valuesize = size - (value - request->tuple); + if (valuesize > 0) + sp_setstring(obj, "value", value, valuesize); + + assert(request->header != NULL); + + void *tx = sp_begin(index->env); + if (tx == NULL) { + sp_destroy(obj); + sophia_error(index->env); + } + +#if 0 + int64_t signature = request->header->lsn; + sp_setint(tx, "lsn", signature); +#endif + if (sp_set(tx, obj) != 0) + sophia_error(index->env); /* obj destroyed by sp_set() */ + + int rc = sp_commit(tx); + switch (rc) { + case 0: + return; + case 1: /* rollback */ + return; + case 2: /* lock */ + sp_destroy(tx); + /* must never happen during JOIN */ + tnt_raise(ClientError, ER_TRANSACTION_CONFLICT); + return; + case -1: + sophia_error(index->env); + return; + default: + assert(0); + } +} + struct tuple * SophiaSpace::executeReplace(struct txn *txn, struct space *space, struct request *request) diff --git a/src/box/sophia_index.h b/src/box/sophia_index.h index d8988166c72dec59d3cb8f80ec2c1c6fbd887b45..4c798f0a28674d747533621526bb7b960264f414 100644 --- a/src/box/sophia_index.h +++ b/src/box/sophia_index.h @@ -69,8 +69,8 @@ class SophiaIndex: public Index { void *env; void *db; -private: void *createDocument(const char *key, const char **keyend); +private: struct tuple_format *format; };