diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index b2a1fcfe6ae877ed59279e1675a830fbd0d3f24d..04a3e98f8f67944d7e3c0f30ecd15dd549d3a51b 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -44,6 +44,7 @@ add_library(box STATIC sysview_engine.cc sysview_index.cc sophia_engine.cc + sophia_space.cc sophia_index.cc space.cc func.cc diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc index 8f732255ba78f2ab387d61903d4d60a13c0b142c..3705695e5609a6f53c3f0a999c6980ffc96315a7 100644 --- a/src/box/sophia_engine.cc +++ b/src/box/sophia_engine.cc @@ -28,11 +28,9 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ -#include "sophia_index.h" - -#include <sys/uio.h> /* struct iovec */ - #include "sophia_engine.h" +#include "sophia_index.h" +#include "sophia_space.h" #include "coeio.h" #include "coio.h" #include "cfg.h" @@ -47,17 +45,11 @@ #include "port.h" #include "request.h" #include "iproto_constants.h" -#include "small/rlist.h" #include "small/pmatomic.h" -#include <errinj.h> #include <sophia.h> #include <stdlib.h> #include <stdio.h> #include <string.h> -#include <sys/stat.h> -#include <sys/types.h> -#include <dirent.h> -#include <errno.h> struct cord *worker_pool; static int worker_pool_size; @@ -71,7 +63,7 @@ sophia_get_parts(struct key_def *key_def, void *obj, void *value, int valuesize, int size = 0; assert(key_def->part_count <= 8); static const char *PARTNAMES[] = { - "key", "key_1", "key_2", "key_3", + "key_0", "key_1", "key_2", "key_3", "key_4", "key_5", "key_6", "key_7" }; for (uint32_t i = 0; i < key_def->part_count; i++) { @@ -295,186 +287,9 @@ sophia_read(void *dest, void *key) return result; } -struct SophiaSpace: public Handler { - SophiaSpace(Engine*); - virtual void - applySnapshotRow(struct space *space, struct request *request); - virtual struct tuple * - executeReplace(struct txn*, struct space *space, - struct request *request); - virtual struct tuple * - executeDelete(struct txn*, struct space *space, - struct request *request); - virtual struct tuple * - executeUpdate(struct txn*, struct space *space, - struct request *request); - virtual void - executeUpsert(struct txn*, struct space *space, - struct request *request); -}; - -void -SophiaSpace::applySnapshotRow(struct space *space, struct request *request) -{ - assert(request->type == IPROTO_INSERT); - SophiaIndex *index = (SophiaIndex *)index_find(space, 0); - - space_validate_tuple_raw(space, request->tuple); - int size = request->tuple_end - request->tuple; - const char *key = tuple_field_raw(request->tuple, size, - index->key_def->parts[0].fieldno); - primary_key_validate(index->key_def, key, index->key_def->part_count); - - const char *value; - void *obj = index->createDocument(key, &value); - size_t valuesize = size - (value - request->tuple); - if (valuesize > 0) - sp_setstring(obj, "value", value, valuesize); - - assert(request->header != NULL); - - void *tx = sp_begin(index->env); - if (tx == NULL) { - sp_destroy(obj); - sophia_error(index->env); - } - - int64_t signature = request->header->lsn; - sp_setint(tx, "lsn", signature); - - if (sp_set(tx, obj) != 0) - sophia_error(index->env); /* obj destroyed by sp_set() */ - - int rc = sp_commit(tx); - switch (rc) { - case 0: - return; - case 1: /* rollback */ - return; - case 2: /* lock */ - sp_destroy(tx); - /* must never happen during JOIN */ - tnt_raise(ClientError, ER_TRANSACTION_CONFLICT); - return; - case -1: - sophia_error(index->env); - return; - default: - assert(0); - } -} - -struct tuple * -SophiaSpace::executeReplace(struct txn *txn, struct space *space, - struct request *request) -{ - (void) txn; - - SophiaIndex *index = (SophiaIndex *)index_find(space, 0); - - space_validate_tuple_raw(space, request->tuple); - - int size = request->tuple_end - request->tuple; - const char *key = - tuple_field_raw(request->tuple, size, - index->key_def->parts[0].fieldno); - primary_key_validate(index->key_def, key, index->key_def->part_count); - - /* Switch from INSERT to REPLACE during recovery. - * - * Database might hold newer key version than currenly - * recovered log record. - */ - enum dup_replace_mode mode = DUP_REPLACE_OR_INSERT; - if (request->type == IPROTO_INSERT) { - SophiaEngine *engine = (SophiaEngine *)space->handler->engine; - if (engine->recovery_complete) - mode = DUP_INSERT; - } - index->replace_or_insert(request->tuple, request->tuple_end, mode); - return NULL; -} - -struct tuple * -SophiaSpace::executeDelete(struct txn *txn, struct space *space, - struct request *request) -{ - (void) txn; - - SophiaIndex *index = (SophiaIndex *)index_find(space, request->index_id); - const char *key = request->key; - uint32_t part_count = mp_decode_array(&key); - primary_key_validate(index->key_def, key, part_count); - index->remove(key); - return NULL; -} - -struct tuple * -SophiaSpace::executeUpdate(struct txn *txn, struct space *space, - struct request *request) -{ - (void) txn; - - /* Try to find the tuple by unique key */ - SophiaIndex *index = (SophiaIndex *)index_find(space, request->index_id); - const char *key = request->key; - uint32_t part_count = mp_decode_array(&key); - primary_key_validate(index->key_def, key, part_count); - struct tuple *old_tuple = index->findByKey(key, part_count); - - if (old_tuple == NULL) - return NULL; - /* Sophia always yields a zero-ref tuple, GC it here. */ - TupleRef old_ref(old_tuple); - - /* Do tuple update */ - struct tuple *new_tuple = - tuple_update(space->format, - region_aligned_alloc_xc_cb, - &fiber()->gc, - old_tuple, request->tuple, - request->tuple_end, - request->index_base); - TupleRef ref(new_tuple); - - space_validate_tuple(space, new_tuple); - space_check_update(space, old_tuple, new_tuple); - - index->replace_or_insert(new_tuple->data, - new_tuple->data + new_tuple->bsize, - DUP_REPLACE); - return NULL; -} - -void -SophiaSpace::executeUpsert(struct txn *txn, struct space *space, - struct request *request) -{ - (void) txn; - SophiaIndex *index = (SophiaIndex *)index_find(space, request->index_id); - - /* Check field count in tuple */ - space_validate_tuple_raw(space, request->tuple); - /* Check tuple fields */ - tuple_validate_raw(space->format, request->tuple); - - index->upsert(request->ops, - request->ops_end, - request->tuple, - request->tuple_end, - request->index_base); -} - -SophiaSpace::SophiaSpace(Engine *e) - :Handler(e) -{ -} - SophiaEngine::SophiaEngine() :Engine("sophia") ,m_prev_commit_lsn(-1) - ,m_prev_checkpoint_lsn(-1) - ,m_checkpoint_lsn(-1) ,recovery_complete(0) { flags = 0; @@ -574,16 +389,13 @@ sophia_join_key_def(void *env, void *db) unsigned i = 0; while (i < count) { char path[64]; - int len = snprintf(path, sizeof(path), "db.%d.index.key", id); - if (i > 0) { - snprintf(path + len, sizeof(path) - len, "_%d", i); - } + snprintf(path, sizeof(path), "db.%d.scheme.key_%d", id, i); char *type = (char *)sp_getstring(env, path, NULL); assert(type != NULL); - if (strcmp(type, "string") == 0) + if (strncmp(type, "string", 6) == 0) key_def->parts[i].type = STRING; else - if (strcmp(type, "u64") == 0) + if (strncmp(type, "u64", 3) == 0) key_def->parts[i].type = NUM; free(type); key_def->parts[i].fieldno = i; @@ -683,51 +495,46 @@ SophiaEngine::dropIndex(Index *index) void SophiaEngine::keydefCheck(struct space *space, struct key_def *key_def) { - switch (key_def->type) { - case TREE: { - if (! key_def->opts.is_unique) { - tnt_raise(ClientError, ER_MODIFY_INDEX, + if (key_def->type != TREE) { + tnt_raise(ClientError, ER_INDEX_TYPE, + key_def->name, + space_name(space)); + } + if (! key_def->opts.is_unique) { + tnt_raise(ClientError, ER_MODIFY_INDEX, + key_def->name, + space_name(space), + "Sophia TREE index must be unique"); + } + if (key_def->iid != 0) { + tnt_raise(ClientError, ER_MODIFY_INDEX, + key_def->name, + space_name(space), + "Sophia TREE secondary indexes are not supported"); + } + const uint32_t keypart_limit = 8; + if (key_def->part_count > keypart_limit) { + tnt_raise(ClientError, ER_MODIFY_INDEX, key_def->name, space_name(space), - "Sophia TREE index must be unique"); - } - if (key_def->iid != 0) { + "Sophia TREE index too many key-parts (8 max)"); + } + unsigned i = 0; + while (i < key_def->part_count) { + struct key_part *part = &key_def->parts[i]; + if (part->type != NUM && part->type != STRING) { tnt_raise(ClientError, ER_MODIFY_INDEX, - key_def->name, - space_name(space), - "Sophia TREE secondary indexes are not supported"); + key_def->name, + space_name(space), + "Sophia TREE index field type must be STR or NUM"); } - const uint32_t keypart_limit = 8; - if (key_def->part_count > keypart_limit) { + if (part->fieldno != i) { tnt_raise(ClientError, ER_MODIFY_INDEX, - key_def->name, - space_name(space), - "Sophia TREE index too many key-parts (8 max)"); + key_def->name, + space_name(space), + "Sophia TREE key-parts must follow first and cannot be sparse"); } - unsigned i = 0; - while (i < key_def->part_count) { - struct key_part *part = &key_def->parts[i]; - if (part->type != NUM && part->type != STRING) { - tnt_raise(ClientError, ER_MODIFY_INDEX, - key_def->name, - space_name(space), - "Sophia TREE index field type must be STR or NUM"); - } - if (part->fieldno != i) { - tnt_raise(ClientError, ER_MODIFY_INDEX, - key_def->name, - space_name(space), - "Sophia TREE key-parts must follow first and cannot be sparse"); - } - i++; - } - break; - } - default: - tnt_raise(ClientError, ER_INDEX_TYPE, - key_def->name, - space_name(space)); - break; + i++; } } @@ -796,12 +603,6 @@ SophiaEngine::commit(struct txn *txn, int64_t signature) txn->engine_tx = NULL; } -void -SophiaEngine::rollbackStatement(struct txn_stmt* /* stmt */) -{ - say_info("SophiaEngine::rollbackStatement()"); -} - void SophiaEngine::rollback(struct txn *txn) { @@ -811,11 +612,6 @@ SophiaEngine::rollback(struct txn *txn) } } -void -SophiaEngine::beginJoin() -{ -} - void SophiaEngine::beginWalRecovery() { @@ -824,13 +620,6 @@ SophiaEngine::beginWalRecovery() sophia_error(env); } -void -SophiaEngine::recoverToCheckpoint(int64_t lsn) -{ - /* do nothing except saving the latest snapshot lsn */ - m_prev_checkpoint_lsn = lsn; -} - int SophiaEngine::beginCheckpoint() { @@ -846,10 +635,8 @@ SophiaEngine::beginCheckpoint() } int -SophiaEngine::waitCheckpoint(struct vclock *vclock) +SophiaEngine::waitCheckpoint(struct vclock*) { - (void)vclock; - if (! worker_pool_run) return 0; for (;;) { @@ -860,17 +647,3 @@ SophiaEngine::waitCheckpoint(struct vclock *vclock) } return 0; } - -void -SophiaEngine::commitCheckpoint() -{ - m_prev_checkpoint_lsn = m_checkpoint_lsn; - m_checkpoint_lsn = -1; -} - -void -SophiaEngine::abortCheckpoint() -{ - if (m_checkpoint_lsn >= 0) - m_checkpoint_lsn = -1; -} diff --git a/src/box/sophia_engine.h b/src/box/sophia_engine.h index 62013bbe035018c63bb57799ebc63f0fae7bef65..5beea79d1a4ca0fb2dfce6323d66b5f418a6cd01 100644 --- a/src/box/sophia_engine.h +++ b/src/box/sophia_engine.h @@ -46,29 +46,22 @@ struct SophiaEngine: public Engine { virtual void begin(struct txn *txn) override; virtual void prepare(struct txn *txn) override; virtual void commit(struct txn *txn, int64_t signature) override; - virtual void rollbackStatement(struct txn_stmt *stmt) override; virtual void rollback(struct txn *txn) override; - virtual void beginJoin() override; virtual void beginWalRecovery() override; - virtual void recoverToCheckpoint(int64_t) override; virtual void endRecovery() override; virtual void join(struct xstream *stream) override; virtual int beginCheckpoint() override; virtual int waitCheckpoint(struct vclock *vclock) override; - virtual void commitCheckpoint() override; - virtual void abortCheckpoint() override; - void *env; private: int64_t m_prev_commit_lsn; - int64_t m_prev_checkpoint_lsn; - int64_t m_checkpoint_lsn; public: + void *env; int recovery_complete; }; extern "C" { typedef void (*sophia_info_f)(const char*, const char*, void*); -int sophia_info(const char*, sophia_info_f, void*); +int sophia_info(const char*, sophia_info_f, void*); } void sophia_error(void*); void *sophia_read(void*, void*); @@ -76,6 +69,6 @@ void sophia_workers_start(void*); struct tuple * sophia_tuple_new(void *obj, struct key_def *key_def, - struct tuple_format *format); + struct tuple_format *format); #endif /* TARANTOOL_BOX_SOPHIA_ENGINE_H_INCLUDED */ diff --git a/src/box/sophia_index.cc b/src/box/sophia_index.cc index bb7af685794a30a2a95ee556e5be2985e0fbbd56..ba1575f6daf212033dab69acbb9502382bb8c6aa 100644 --- a/src/box/sophia_index.cc +++ b/src/box/sophia_index.cc @@ -28,20 +28,17 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ -#include "sophia_index.h" - #include "sophia_engine.h" +#include "sophia_space.h" +#include "sophia_index.h" #include "say.h" #include "tuple.h" #include "tuple_update.h" #include "scoped_guard.h" -#include "errinj.h" #include "schema.h" #include "space.h" #include "txn.h" #include "cfg.h" -#include <sys/stat.h> -#include <sys/types.h> #include <sophia.h> #include <stdio.h> #include <inttypes.h> @@ -62,9 +59,7 @@ SophiaIndex::createDocument(const char *key, const char **keyend) uint32_t i = 0; while (i < key_def->part_count) { char partname[32]; - int len = snprintf(partname, sizeof(partname), "key"); - if (i > 0) - snprintf(partname + len, sizeof(partname) - len, "_%d", i); + snprintf(partname, sizeof(partname), "key_%d", i); const char *part; uint32_t partsize; if (key_def->parts[i].type == STRING) { @@ -86,13 +81,6 @@ SophiaIndex::createDocument(const char *key, const char **keyend) return obj; } -static int -sophia_upsert_callback(char **result, - char **key, int *key_size, int key_count, - char *src, int src_size, - char *upsert, int upsert_size, - void *arg); - static inline void* sophia_configure(struct space *space, struct key_def *key_def) { @@ -111,37 +99,36 @@ sophia_configure(struct space *space, struct key_def *key_def) uint32_t i = 0; while (i < key_def->part_count) { - char *type; - if (key_def->parts[i].type == NUM) - type = (char *)"u64"; - else - type = (char *)"string"; + /* create key field */ char part[32]; - if (i == 0) { - snprintf(part, sizeof(part), "key"); - } else { - /* create key-part */ - snprintf(path, sizeof(path), "db.%" PRIu32 ".index", - key_def->space_id); - snprintf(part, sizeof(part), "key_%d", i); - sp_setstring(env, path, part, 0); - } - /* set key-part type */ - snprintf(path, sizeof(path), "db.%" PRIu32 ".index.%s", + snprintf(path, sizeof(path), "db.%" PRIu32 ".scheme", + key_def->space_id); + snprintf(part, sizeof(part), "key_%d", i); + sp_setstring(env, path, part, 0); + /* set fields type */ + char type[32]; + snprintf(type, sizeof(type), "%s,key(%d)", + (key_def->parts[i].type == NUM ? "u64" : "string"), + i); + snprintf(path, sizeof(path), "db.%" PRIu32 ".scheme.%s", key_def->space_id, part); sp_setstring(env, path, type, 0); i++; } + /* value field */ + snprintf(path, sizeof(path), "db.%" PRIu32 ".scheme", + key_def->space_id); + sp_setstring(env, path, "value", 0); + /* db.path */ if (key_def->opts.path[0] != '\0') { snprintf(path, sizeof(path), "db.%" PRIu32 ".path", key_def->space_id); sp_setstring(env, path, key_def->opts.path, 0); } /* db.upsert */ - snprintf(path, sizeof(path), "db.%" PRIu32 ".index.upsert", key_def->space_id); - sp_setstring(env, path, (const void *)(uintptr_t)sophia_upsert_callback, 0); - /* db.upsert_arg */ - snprintf(path, sizeof(path), "db.%" PRIu32 ".index.upsert_arg", key_def->space_id); + snprintf(path, sizeof(path), "db.%" PRIu32 ".upsert", key_def->space_id); + sp_setstring(env, path, (const void *)(uintptr_t)sophia_upsert_cb, 0); + snprintf(path, sizeof(path), "db.%" PRIu32 ".upsert_arg", key_def->space_id); sp_setstring(env, path, (const void *)key_def, 0); /* db.compression */ if (key_def->opts.compression[0] != '\0') { @@ -265,12 +252,17 @@ SophiaIndex::findByKey(const char *key, uint32_t part_count = 0) const /* try to read from cache first, if nothing is found * retry using disk */ sp_setint(obj, "cache_only", 1); - sp_setint(obj, "immutable", 1); + int rc; + rc = sp_open(obj); + if (rc == -1) { + sp_destroy(obj); + sophia_error(env); + } void *result = sp_get(transaction, obj); - sp_setint(obj, "immutable", 0); if (result == NULL) { sp_setint(obj, "cache_only", 0); result = sophia_read(transaction, obj); + sp_destroy(obj); if (result == NULL) return NULL; } else { @@ -286,255 +278,12 @@ SophiaIndex::replace(struct tuple*, struct tuple*, enum dup_replace_mode) { /* This method is unused by sophia index. * - * see ::replace_or_insert() */ + * see: sophia_space.cc + */ assert(0); return NULL; } -struct sophia_mempool { - void *chunks[128]; - int count; -}; - -static inline void -sophia_mempool_init(sophia_mempool *p) -{ - memset(p->chunks, 0, sizeof(p->chunks)); - p->count = 0; -} - -static inline void -sophia_mempool_free(sophia_mempool *p) -{ - int i = 0; - while (i < p->count) { - free(p->chunks[i]); - i++; - } -} - -static void * -sophia_update_alloc(void *arg, size_t size) -{ - /* simulate region allocator for use with - * tuple_upsert_execute() */ - struct sophia_mempool *p = (struct sophia_mempool*)arg; - assert(p->count < 128); - void *ptr = malloc(size); - p->chunks[p->count++] = ptr; - return ptr; -} - -static inline int -sophia_upsert_mp(char **tuple, int *tuple_size_key, struct key_def *key_def, - char **key, int *key_size, - char *src, int src_size) -{ - /* calculate msgpack size */ - uint32_t mp_keysize = 0; - uint32_t i = 0; - while (i < key_def->part_count) { - if (key_def->parts[i].type == STRING) - mp_keysize += mp_sizeof_str(key_size[i]); - else - mp_keysize += mp_sizeof_uint(load_u64(key[i])); - i++; - } - *tuple_size_key = mp_keysize; - - /* count fields */ - int count = key_def->part_count; - const char *p = src; - while (p < (src + src_size)) { - count++; - mp_next((const char **)&p); - } - - /* allocate and encode tuple */ - int mp_size = mp_sizeof_array(count) + - mp_keysize + src_size; - char *mp = (char *)malloc(mp_size); - char *mp_ptr = mp; - if (mp == NULL) - return -1; - mp_ptr = mp_encode_array(mp_ptr, count); - i = 0; - while (i < key_def->part_count) { - if (key_def->parts[i].type == STRING) - mp_ptr = mp_encode_str(mp_ptr, key[i], key_size[i]); - else - mp_ptr = mp_encode_uint(mp_ptr, load_u64(key[i])); - i++; - } - memcpy(mp_ptr, src, src_size); - - *tuple = mp; - return mp_size; -} - -static inline int -sophia_upsert(char **result, - char *tuple, int tuple_size, int tuple_size_key, - char *upsert, int upsert_size) -{ - char *p = upsert; - uint8_t index_base = *(uint8_t *)p; - p += sizeof(uint8_t); - uint32_t default_tuple_size = *(uint32_t *)p; - p += sizeof(uint32_t); - p += default_tuple_size; - char *expr = p; - char *expr_end = upsert + upsert_size; - const char *up; - uint32_t up_size; - /* emit upsert */ - struct sophia_mempool alloc; - sophia_mempool_init(&alloc); - try { - up = tuple_upsert_execute(sophia_update_alloc, &alloc, - expr, - expr_end, - tuple, - tuple + tuple_size, - &up_size, index_base); - } catch (Exception *e) { - sophia_mempool_free(&alloc); - return -1; - } - - /* skip array size and key */ - const char *ptr = up; - mp_decode_array(&ptr); - ptr += tuple_size_key; - - /* get new value */ - int size = (int)((up + up_size) - ptr); - *result = (char *)malloc(size); - if (! *result) { - sophia_mempool_free(&alloc); - return -1; - } - memcpy(*result, ptr, size); - sophia_mempool_free(&alloc); - return size; -} - -static int -sophia_upsert_callback(char **result, - char **key, int *key_size, int /* key_count */, - char *src, int src_size, - char *upsert, int upsert_size, - void *arg) -{ - /* use default tuple value */ - if (src == NULL) { - char *p = upsert; - p += sizeof(uint8_t); /* index base */ - uint32_t value_size = *(uint32_t *)p; - p += sizeof(uint32_t); - *result = (char *)malloc(value_size); - if (! *result) - return -1; - memcpy(*result, p, value_size); - return value_size; - } - struct key_def *key_def = (struct key_def *)arg; - /* convert to msgpack */ - char *tuple; - int tuple_size_key; - int tuple_size; - tuple_size = sophia_upsert_mp(&tuple, &tuple_size_key, - key_def, key, key_size, - src, src_size); - if (tuple_size == -1) - return -1; - /* execute upsert */ - int size; - size = sophia_upsert(result, - tuple, tuple_size, tuple_size_key, - upsert, - upsert_size); - free(tuple); - return size; -} - -void -SophiaIndex::upsert(const char *expr, - const char *expr_end, - const char *tuple, - const char *tuple_end, - uint8_t index_base) -{ - mp_decode_array(&tuple); - uint32_t expr_size = expr_end - expr; - uint32_t tuple_size = tuple_end - tuple; - uint32_t tuple_value_size; - const char *tuple_value; - void *obj = createDocument(tuple, &tuple_value); - tuple_value_size = tuple_size - (tuple_value - tuple); - uint32_t value_size = - sizeof(uint8_t) + sizeof(uint32_t) + tuple_value_size + expr_size; - char *value = (char *)malloc(value_size); - if (value == NULL) { - } - char *p = value; - memcpy(p, &index_base, sizeof(uint8_t)); - p += sizeof(uint8_t); - memcpy(p, &tuple_value_size, sizeof(uint32_t)); - p += sizeof(uint32_t); - memcpy(p, tuple_value, tuple_value_size); - p += tuple_value_size; - memcpy(p, expr, expr_size); - sp_setstring(obj, "value", value, value_size); - void *transaction = in_txn()->engine_tx; - int rc = sp_upsert(transaction, obj); - free(value); - if (rc == -1) - sophia_error(env); -} - -void -SophiaIndex::replace_or_insert(const char *tuple, - const char *tuple_end, - enum dup_replace_mode mode) -{ - uint32_t size = tuple_end - tuple; - const char *key = tuple_field_raw(tuple, size, key_def->parts[0].fieldno); - /* insert: ensure key does not exists */ - if (mode == DUP_INSERT) { - struct tuple *found = findByKey(key); - if (found) { - tuple_delete(found); - struct space *sp = space_cache_find(key_def->space_id); - tnt_raise(ClientError, ER_TUPLE_FOUND, - index_name(this), space_name(sp)); - } - } - - /* replace */ - void *transaction = in_txn()->engine_tx; - const char *value; - size_t valuesize; - void *obj = createDocument(key, &value); - valuesize = size - (value - tuple); - if (valuesize > 0) - sp_setstring(obj, "value", value, valuesize); - int rc; - rc = sp_set(transaction, obj); - if (rc == -1) - sophia_error(env); -} - -void -SophiaIndex::remove(const char *key) -{ - void *obj = createDocument(key, NULL); - void *transaction = in_txn()->engine_tx; - int rc = sp_delete(transaction, obj); - if (rc == -1) - sophia_error(env); -} - struct sophia_iterator { struct iterator base; const char *key; @@ -569,16 +318,6 @@ sophia_iterator_last(struct iterator *ptr __attribute__((unused))) return NULL; } -static inline void -sophia_iterator_mode(struct sophia_iterator *it, - bool cache_only, - bool immutable) -{ - void *obj = it->current; - sp_setint(obj, "cache_only", cache_only); - sp_setint(obj, "immutable", immutable); -} - struct tuple * sophia_iterator_next(struct iterator *ptr) { @@ -589,29 +328,29 @@ sophia_iterator_next(struct iterator *ptr) void *obj; obj = sp_get(it->cursor, it->current); if (likely(obj != NULL)) { - sp_setint(it->current, "immutable", 0); sp_destroy(it->current); it->current = obj; - sp_setint(it->current, "immutable", 1); return sophia_tuple_new(obj, it->key_def, it->space->format); } /* switch to asynchronous mode (read from disk) */ - sophia_iterator_mode(it, false, false); + sp_setint(it->current, "cache_only", 0); obj = sophia_read(it->cursor, it->current); if (obj == NULL) { ptr->next = sophia_iterator_last; - it->current = NULL; /* immediately close the cursor */ sp_destroy(it->cursor); + sp_destroy(it->current); + it->current = NULL; it->cursor = NULL; return NULL; } + sp_destroy(it->current); it->current = obj; /* switch back to synchronous mode */ - sophia_iterator_mode(it, true, true); + sp_setint(obj, "cache_only", 1); return sophia_tuple_new(obj, it->key_def, it->space->format); } @@ -655,8 +394,7 @@ SophiaIndex::initIterator(struct iterator *ptr, assert(it->cursor == NULL); if (part_count > 0) { if (part_count != key_def->part_count) { - tnt_raise(UnsupportedIndexFeature, this, - "partial keys"); + tnt_raise(UnsupportedIndexFeature, this, "partial keys"); } } else { key = NULL; @@ -706,6 +444,6 @@ SophiaIndex::initIterator(struct iterator *ptr, } it->current = obj; /* switch to sync mode (cache read) */ - sophia_iterator_mode(it, true, true); + sp_setint(obj, "cache_only", 1); ptr->next = sophia_iterator_first; } diff --git a/src/box/sophia_index.h b/src/box/sophia_index.h index 166235e496fa87acb07a987d2d5c9b3e74dbe346..3f0fc722f3b811927efd6b198006defa67c01782 100644 --- a/src/box/sophia_index.h +++ b/src/box/sophia_index.h @@ -57,15 +57,6 @@ class SophiaIndex: public Index { virtual size_t bsize() const override; public: - void replace_or_insert(const char *tuple, - const char *tuple_end, - enum dup_replace_mode mode); - void remove(const char *key); - void upsert(const char *ops, - const char *ops_end, - const char *tuple, - const char *tuple_end, - uint8_t index_base); void *env; void *db; diff --git a/src/box/sophia_space.cc b/src/box/sophia_space.cc new file mode 100644 index 0000000000000000000000000000000000000000..60e05c7286441bc4ee8f61e5633fe1f6d9d9d501 --- /dev/null +++ b/src/box/sophia_space.cc @@ -0,0 +1,443 @@ +/* + * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "sophia_engine.h" +#include "sophia_index.h" +#include "sophia_space.h" +#include "xrow.h" +#include "tuple.h" +#include "scoped_guard.h" +#include "txn.h" +#include "index.h" +#include "space.h" +#include "schema.h" +#include "port.h" +#include "request.h" +#include "iproto_constants.h" +#include <sophia.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +SophiaSpace::SophiaSpace(Engine *e) + :Handler(e) +{ } + +void +SophiaSpace::applySnapshotRow(struct space *space, struct request *request) +{ + assert(request->type == IPROTO_INSERT); + SophiaIndex *index = (SophiaIndex *)index_find(space, 0); + + space_validate_tuple_raw(space, request->tuple); + int size = request->tuple_end - request->tuple; + const char *key = tuple_field_raw(request->tuple, size, + index->key_def->parts[0].fieldno); + primary_key_validate(index->key_def, key, index->key_def->part_count); + + const char *value = NULL; + void *obj = index->createDocument(key, &value); + size_t valuesize = size - (value - request->tuple); + if (valuesize > 0) + sp_setstring(obj, "value", value, valuesize); + + assert(request->header != NULL); + + void *tx = sp_begin(index->env); + if (tx == NULL) { + sp_destroy(obj); + sophia_error(index->env); + } + + int64_t signature = request->header->lsn; + sp_setint(tx, "lsn", signature); + + if (sp_set(tx, obj) != 0) + sophia_error(index->env); /* obj destroyed by sp_set() */ + + int rc = sp_commit(tx); + switch (rc) { + case 0: + return; + case 1: /* rollback */ + return; + case 2: /* lock */ + sp_destroy(tx); + /* must never happen during JOIN */ + tnt_raise(ClientError, ER_TRANSACTION_CONFLICT); + return; + case -1: + sophia_error(index->env); + return; + default: + assert(0); + } +} + +struct tuple * +SophiaSpace::executeReplace(struct txn*, + struct space *space, + struct request *request) +{ + SophiaIndex *index = (SophiaIndex *)index_find(space, 0); + + space_validate_tuple_raw(space, request->tuple); + + int size = request->tuple_end - request->tuple; + const char *key = + tuple_field_raw(request->tuple, size, + index->key_def->parts[0].fieldno); + primary_key_validate(index->key_def, key, index->key_def->part_count); + + /* unique constraint */ + if (request->type == IPROTO_INSERT) { + enum dup_replace_mode mode = DUP_REPLACE_OR_INSERT; + SophiaEngine *engine = + (SophiaEngine *)space->handler->engine; + if (engine->recovery_complete) + mode = DUP_INSERT; + if (mode == DUP_INSERT) { + struct tuple *found = index->findByKey(key, 0); + if (found) { + tuple_delete(found); + tnt_raise(ClientError, ER_TUPLE_FOUND, + index_name(index), space_name(space)); + } + } + } + + /* replace */ + void *transaction = in_txn()->engine_tx; + const char *value = NULL; + void *obj = index->createDocument(key, &value); + size_t valuesize = size - (value - request->tuple); + if (valuesize > 0) + sp_setstring(obj, "value", value, valuesize); + int rc; + rc = sp_set(transaction, obj); + if (rc == -1) + sophia_error(index->env); + + return NULL; +} + +struct tuple * +SophiaSpace::executeDelete(struct txn*, struct space *space, + struct request *request) +{ + SophiaIndex *index = (SophiaIndex *)index_find(space, request->index_id); + const char *key = request->key; + uint32_t part_count = mp_decode_array(&key); + primary_key_validate(index->key_def, key, part_count); + + /* remove */ + void *obj = index->createDocument(key, NULL); + void *transaction = in_txn()->engine_tx; + int rc = sp_delete(transaction, obj); + if (rc == -1) + sophia_error(index->env); + return NULL; +} + +struct tuple * +SophiaSpace::executeUpdate(struct txn*, struct space *space, + struct request *request) +{ + /* Try to find the tuple by unique key */ + SophiaIndex *index = (SophiaIndex *)index_find(space, request->index_id); + const char *key = request->key; + uint32_t part_count = mp_decode_array(&key); + primary_key_validate(index->key_def, key, part_count); + struct tuple *old_tuple = index->findByKey(key, part_count); + + if (old_tuple == NULL) + return NULL; + + /* Sophia always yields a zero-ref tuple, GC it here. */ + TupleRef old_ref(old_tuple); + + /* Do tuple update */ + struct tuple *new_tuple = + tuple_update(space->format, + region_aligned_alloc_xc_cb, + &fiber()->gc, + old_tuple, request->tuple, + request->tuple_end, + request->index_base); + TupleRef ref(new_tuple); + + space_validate_tuple(space, new_tuple); + space_check_update(space, old_tuple, new_tuple); + + /* replace */ + key = tuple_field_raw(new_tuple->data, new_tuple->bsize, + index->key_def->parts[0].fieldno); + void *transaction = in_txn()->engine_tx; + const char *value = NULL; + void *obj = index->createDocument(key, &value); + size_t valuesize = new_tuple->bsize - (value - new_tuple->data); + if (valuesize > 0) + sp_setstring(obj, "value", value, valuesize); + int rc; + rc = sp_set(transaction, obj); + if (rc == -1) + sophia_error(index->env); + return NULL; +} + +static inline int +sophia_upsert_prepare(char **src, uint32_t *src_size, + char **mp, uint32_t *mp_size, uint32_t *mp_size_key, + struct key_def *key_def) +{ + /* calculate msgpack size */ + uint32_t i = 0; + while (i < key_def->part_count) { + if (key_def->parts[i].type == STRING) + *mp_size_key += mp_sizeof_str(src_size[i]); + else + *mp_size_key += mp_sizeof_uint(load_u64(src[i])); + i++; + } + + /* count msgpack fields */ + uint32_t count = key_def->part_count; + uint32_t value_field = key_def->part_count; + uint32_t value_size = src_size[value_field]; + char *p = src[value_field]; + char *end = p + value_size; + while (p < end) { + count++; + mp_next((const char **)&p); + } + + /* allocate and encode tuple */ + *mp_size = mp_sizeof_array(count) + *mp_size_key + value_size; + *mp = (char *)malloc(*mp_size); + if (mp == NULL) + return -1; + p = *mp; + p = mp_encode_array(p, count); + i = 0; + while (i < key_def->part_count) { + if (key_def->parts[i].type == STRING) + p = mp_encode_str(p, src[i], src_size[i]); + else + p = mp_encode_uint(p, load_u64(src[i])); + i++; + } + memcpy(p, src[value_field], src_size[value_field]); + return 0; +} + +struct sophia_mempool { + void *chunks[128]; + int count; +}; + +static inline void +sophia_mempool_init(sophia_mempool *p) +{ + p->count = 0; +} + +static inline void +sophia_mempool_free(sophia_mempool *p) +{ + int i = 0; + while (i < p->count) { + free(p->chunks[i]); + i++; + } +} + +static void * +sophia_update_alloc(void *arg, size_t size) +{ + /* simulate region allocator for use with + * tuple_upsert_execute() */ + struct sophia_mempool *p = (struct sophia_mempool*)arg; + assert(p->count < 128); + void *ptr = malloc(size); + p->chunks[p->count++] = ptr; + return ptr; +} + +static inline int +sophia_upsert(char **result, uint32_t *result_size, + char *tuple, uint32_t tuple_size, uint32_t tuple_size_key, + char *upsert, int upsert_size) +{ + char *p = upsert; + uint8_t index_base = *(uint8_t *)p; + p += sizeof(uint8_t); + uint32_t default_tuple_size = *(uint32_t *)p; + p += sizeof(uint32_t); + p += default_tuple_size; + char *expr = p; + char *expr_end = upsert + upsert_size; + const char *up; + uint32_t up_size; + + /* emit upsert */ + struct sophia_mempool alloc; + sophia_mempool_init(&alloc); + try { + up = tuple_upsert_execute(sophia_update_alloc, &alloc, + expr, + expr_end, + tuple, + tuple + tuple_size, + &up_size, index_base); + } catch (Exception *e) { + sophia_mempool_free(&alloc); + return -1; + } + + /* skip array size and key */ + const char *ptr = up; + mp_decode_array(&ptr); + ptr += tuple_size_key; + + /* get new value */ + *result_size = (uint32_t)((up + up_size) - ptr); + *result = (char *)malloc(*result_size); + if (! *result) { + sophia_mempool_free(&alloc); + return -1; + } + memcpy(*result, ptr, *result_size); + sophia_mempool_free(&alloc); + return 0; +} + +int +sophia_upsert_cb(int count, + char **src, uint32_t *src_size, + char **upsert, uint32_t *upsert_size, + char **result, uint32_t *result_size, + void *arg) +{ + struct key_def *key_def = (struct key_def *)arg; + + uint32_t value_field; + value_field = key_def->part_count; + + /* use default tuple value */ + if (src == NULL) + { + /* result key fields are initialized to upsert + * fields by default */ + char *p = upsert[value_field]; + p += sizeof(uint8_t); /* index base */ + uint32_t value_size = *(uint32_t *)p; + p += sizeof(uint32_t); + void *value = (char *)malloc(value_size); + if (value == NULL) + return -1; + memcpy(value, p, value_size); + result[value_field] = (char*)value; + result_size[value_field] = value_size; + return 0; + } + + /* convert src to msgpack */ + char *tuple; + uint32_t tuple_size_key; + uint32_t tuple_size; + int rc; + rc = sophia_upsert_prepare(src, src_size, + &tuple, &tuple_size, &tuple_size_key, + key_def); + if (rc == -1) + return -1; + + /* execute upsert */ + rc = sophia_upsert(&result[value_field], + &result_size[value_field], + tuple, tuple_size, tuple_size_key, + upsert[value_field], + upsert_size[value_field]); + free(tuple); + + (void)count; + (void)upsert_size; + return rc; +} + +void +SophiaSpace::executeUpsert(struct txn*, struct space *space, + struct request *request) +{ + SophiaIndex *index = (SophiaIndex *)index_find(space, request->index_id); + + /* Check field count in tuple */ + space_validate_tuple_raw(space, request->tuple); + + /* Check tuple fields */ + tuple_validate_raw(space->format, request->tuple); + + const char *expr = request->ops; + const char *expr_end = request->ops_end; + const char *tuple = request->tuple; + const char *tuple_end = request->tuple_end; + uint8_t index_base = request->index_base; + + /* upsert */ + mp_decode_array(&tuple); + uint32_t expr_size = expr_end - expr; + uint32_t tuple_size = tuple_end - tuple; + uint32_t tuple_value_size; + const char *tuple_value; + void *obj = index->createDocument(tuple, &tuple_value); + tuple_value_size = tuple_size - (tuple_value - tuple); + uint32_t value_size = + sizeof(uint8_t) + sizeof(uint32_t) + tuple_value_size + expr_size; + char *value = (char *)malloc(value_size); + if (value == NULL) { + sp_destroy(obj); + tnt_raise(OutOfMemory, sizeof(value_size), "Sophia Space", + "executeUpsert"); + } + char *p = value; + memcpy(p, &index_base, sizeof(uint8_t)); + p += sizeof(uint8_t); + memcpy(p, &tuple_value_size, sizeof(uint32_t)); + p += sizeof(uint32_t); + memcpy(p, tuple_value, tuple_value_size); + p += tuple_value_size; + memcpy(p, expr, expr_size); + sp_setstring(obj, "value", value, value_size); + void *transaction = in_txn()->engine_tx; + int rc = sp_upsert(transaction, obj); + free(value); + if (rc == -1) + sophia_error(index->env); +} diff --git a/src/box/sophia_space.h b/src/box/sophia_space.h new file mode 100644 index 0000000000000000000000000000000000000000..7dc68ba374491e0e56d8b9aca8b7eb1cba86128b --- /dev/null +++ b/src/box/sophia_space.h @@ -0,0 +1,59 @@ +#ifndef TARANTOOL_BOX_SOPHIA_SPACE_H_INCLUDED +#define TARANTOOL_BOX_SOPHIA_SPACE_H_INCLUDED +/* + * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +struct SophiaSpace: public Handler { + SophiaSpace(Engine*); + virtual void + applySnapshotRow(struct space *space, struct request *request); + virtual struct tuple * + executeReplace(struct txn*, struct space *space, + struct request *request); + virtual struct tuple * + executeDelete(struct txn*, struct space *space, + struct request *request); + virtual struct tuple * + executeUpdate(struct txn*, struct space *space, + struct request *request); + virtual void + executeUpsert(struct txn*, struct space *space, + struct request *request); +}; + +int +sophia_upsert_cb(int count, + char **src, uint32_t *src_size, + char **upsert, uint32_t *upsert_size, + char **result, uint32_t *result_size, + void *arg); + +#endif /* TARANTOOL_BOX_SOPHIA_SPACE_H_INCLUDED */ diff --git a/third_party/sophia b/third_party/sophia index 971f275d695e604aca33c1686dc9293795675fe1..8d25e8d4f22cb8d5fa583cf4384871c4ede1845e 160000 --- a/third_party/sophia +++ b/third_party/sophia @@ -1 +1 @@ -Subproject commit 971f275d695e604aca33c1686dc9293795675fe1 +Subproject commit 8d25e8d4f22cb8d5fa583cf4384871c4ede1845e