diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 6287a7a10da14ee1b9a80be95113cf629fdfb6d0..329331a92a6b4fcdb4f3ec19d4cd8f7e563209da 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -96,7 +96,8 @@ const char *iproto_type_strs[] = "DELETE", "CALL", "AUTH", - "EVAL" + "EVAL", + "UPSERT" }; #define bit(c) (1ULL<<IPROTO_##c) diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 43bff6e0a2fa283791a4de111d63f40ca257a4b5..a4ea276a19608e748db2245524cc0a31635090b6 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -69,6 +69,7 @@ enum iproto_key { IPROTO_CLUSTER_UUID = 0x25, IPROTO_VCLOCK = 0x26, IPROTO_EXPR = 0x27, /* EVAL */ + IPROTO_DEF_TUPLE = 0x28, /* Leave a gap between request keys and response keys */ IPROTO_DATA = 0x30, IPROTO_ERROR = 0x31, @@ -114,7 +115,7 @@ extern const unsigned char iproto_key_type[IPROTO_KEY_MAX]; enum iproto_type { /* command is successful */ IPROTO_OK = 0, - /* dml command codes */ + /* dml command codes (see extra dml command codes) */ IPROTO_SELECT = 1, IPROTO_INSERT = 2, IPROTO_REPLACE = 3, @@ -124,7 +125,10 @@ enum iproto_type { IPROTO_CALL = 6, IPROTO_AUTH = 7, IPROTO_EVAL = 8, - IPROTO_TYPE_STAT_MAX = IPROTO_EVAL + 1, + IPROTO_UPSERT = 9, + IPROTO_TYPE_EXT_DML_MAX = IPROTO_UPSERT + 1, + IPROTO_TYPE_STAT_MAX = IPROTO_UPSERT + 1, + /* new range of dml command codes */ /* admin command codes */ IPROTO_PING = 64, IPROTO_JOIN = 65, @@ -157,7 +161,8 @@ iproto_type_is_select(uint32_t type) static inline bool iproto_type_is_dml(uint32_t type) { - return type >= IPROTO_SELECT && type < IPROTO_TYPE_DML_MAX; + return (type >= IPROTO_SELECT && type < IPROTO_TYPE_DML_MAX) || + (type == IPROTO_UPSERT && type < IPROTO_TYPE_EXT_DML_MAX); } static inline bool diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc index f595efc92736868fe5ae2ef6f43817cad6bae156..648959f357c6fd5ae553b91864d6c55759d8395e 100644 --- a/src/box/lua/call.cc +++ b/src/box/lua/call.cc @@ -192,7 +192,7 @@ lbox_process(lua_State *L) void lbox_request_create(struct request *request, struct lua_State *L, enum iproto_type type, - int key, int tuple) + int key, int tuple, int def_tuple) { request_create(request, type); request->space_id = lua_tointeger(L, 1); @@ -210,6 +210,15 @@ lbox_request_create(struct request *request, request->tuple = obuf_join(&tuple_buf); request->tuple_end = request->tuple + obuf_size(&tuple_buf); } + if (def_tuple > 0) { + struct obuf tuple_buf; + obuf_create(&tuple_buf, &fiber()->gc, LUAMP_ALLOC_FACTOR); + luamp_encode_tuple(L, luaL_msgpack_default, &tuple_buf, + def_tuple); + request->extra_tuple = obuf_join(&tuple_buf); + request->extra_tuple_end = + request->extra_tuple + obuf_size(&tuple_buf); + } } static void @@ -300,7 +309,7 @@ lbox_insert(lua_State *L) struct request request; struct port_lua port; - lbox_request_create(&request, L, IPROTO_INSERT, -1, 2); + lbox_request_create(&request, L, IPROTO_INSERT, -1, 2, -1); port_lua_create(&port, L); box_process(&request, (struct port *) &port); return lua_gettop(L) - 2; @@ -314,7 +323,7 @@ lbox_replace(lua_State *L) struct request request; struct port_lua port; - lbox_request_create(&request, L, IPROTO_REPLACE, -1, 2); + lbox_request_create(&request, L, IPROTO_REPLACE, -1, 2, -1); port_lua_create(&port, L); box_process(&request, (struct port *) &port); return lua_gettop(L) - 2; @@ -323,12 +332,13 @@ lbox_replace(lua_State *L) static int lbox_update(lua_State *L) { - if (lua_gettop(L) != 4 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2)) + if (lua_gettop(L) != 4 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2) || + lua_type(L, 3) != LUA_TTABLE || lua_type(L, 4) != LUA_TTABLE) return luaL_error(L, "Usage space:update(key, ops)"); struct request request; struct port_lua port; - lbox_request_create(&request, L, IPROTO_UPDATE, 3, 4); + lbox_request_create(&request, L, IPROTO_UPDATE, 3, 4, -1); request.index_base = 1; /* field ids are one-indexed */ port_lua_create(&port, L); /* Ignore index_id for now */ @@ -336,6 +346,24 @@ lbox_update(lua_State *L) return lua_gettop(L) - 4; } +static int +lbox_upsert(lua_State *L) +{ + if (lua_gettop(L) != 5 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2) || + lua_type(L, 3) != LUA_TTABLE || lua_type(L, 4) != LUA_TTABLE || + lua_type(L, 5) != LUA_TTABLE) + return luaL_error(L, "Usage space:upsert(key, ops, def_tuple)"); + + struct request request; + struct port_lua port; + lbox_request_create(&request, L, IPROTO_UPSERT, 3, 4, 5); + request.index_base = 1; /* field ids are one-indexed */ + port_lua_create(&port, L); + /* Ignore index_id for now */ + box_process(&request, (struct port *) &port); + return lua_gettop(L) - 5; +} + static int lbox_delete(lua_State *L) { @@ -344,7 +372,7 @@ lbox_delete(lua_State *L) struct request request; struct port_lua port; - lbox_request_create(&request, L, IPROTO_DELETE, 3, -1); + lbox_request_create(&request, L, IPROTO_DELETE, 3, -1, -1); port_lua_create(&port, L); /* Ignore index_id for now */ box_process(&request, (struct port *) &port); @@ -754,6 +782,7 @@ static const struct luaL_reg boxlib_internal[] = { {"replace", lbox_replace}, {"update", lbox_update}, {"delete", lbox_delete}, + {"upsert", lbox_upsert}, {NULL, NULL} }; diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua index d5eb948d49eed7b6e3b970878d9940b78ecaaeb5..a309112173226b02ef1d25977d7d6494adcaeb8b 100644 --- a/src/box/lua/schema.lua +++ b/src/box/lua/schema.lua @@ -714,6 +714,10 @@ function box.schema.space.bless(space) index_mt.update = function(index, key, ops) return internal.update(index.space_id, index.id, keify(key), ops); end + index_mt.upsert = function(index, key, ops, def_tuple) + return internal.upsert(index.space_id, index.id, keify(key), ops, + def_tuple); + end index_mt.delete = function(index, key) return internal.delete(index.space_id, index.id, keify(key)); end @@ -758,6 +762,10 @@ function box.schema.space.bless(space) check_index(space, 0) return space.index[0]:update(key, ops) end + space_mt.upsert = function(space, key, ops, def_tuple) + check_index(space, 0) + return space.index[0]:upsert(key, ops, def_tuple) + end space_mt.delete = function(space, key) check_index(space, 0) return space.index[0]:delete(key) diff --git a/src/box/request.cc b/src/box/request.cc index 719d76bdcdae272f5a7359a5d8e59cf4d139247d..3a1fa5ed5466d3b769e389891220fc3af17902ac 100644 --- a/src/box/request.cc +++ b/src/box/request.cc @@ -114,6 +114,55 @@ execute_update(struct request *request, struct port *port) port_add_tuple(port, new_tuple); } +static void +execute_upsert(struct request *request, struct port *port) +{ + struct space *space = space_cache_find(request->space_id); + struct txn *txn = txn_begin_stmt(request, space); + + access_check_space(space, PRIV_W); + Index *pk = index_find(space, 0); + /* Try to find the tuple by primary key. */ + const char *key = request->key; + uint32_t part_count = mp_decode_array(&key); + primary_key_validate(pk->key_def, key, part_count); + struct tuple *old_tuple = pk->findByKey(key, part_count); + + TupleGuardSafe old_guard(old_tuple); + + /* Update the tuple. */ + struct tuple *new_tuple = + tuple_upsert(space->format, region_alloc_cb, + &fiber()->gc, old_tuple, + request->extra_tuple, request->extra_tuple_end, + request->tuple, request->tuple_end, + request->index_base); + TupleGuard guard(new_tuple); + try { + space_validate_tuple(space, new_tuple); + if (old_tuple && + !engine_auto_check_update(space->handler->engine->flags)) + space_check_update(space, old_tuple, new_tuple); + txn_replace(txn, space, old_tuple, new_tuple, + old_tuple ? DUP_REPLACE : DUP_INSERT); + } catch (ClientError *e) { + say_error("The following error occured during UPSERT " + "operation:"); + e->log(); + txn_rollback_stmt(); + if (old_tuple) + port_add_tuple(port, old_tuple); + return; + } + txn_commit_stmt(txn); + /* + * Adding result to port must be after possible WAL write. + * The reason is that any yield between port_add_tuple and port_eof + * calls could lead to sending not finished response to iproto socket. + */ + port_add_tuple(port, new_tuple); +} + static void execute_delete(struct request *request, struct port *port) { @@ -202,7 +251,7 @@ process_rw(struct request *request, struct port *port) assert(iproto_type_is_dml(request->type)); static const request_execute_f execute_map[] = { NULL, execute_select, execute_replace, execute_replace, - execute_update, execute_delete + execute_update, execute_delete, 0, 0, 0, execute_upsert }; request_execute_f fun = execute_map[request->type]; assert(fun != NULL); @@ -269,6 +318,9 @@ request_decode(struct request *request, const char *data, uint32_t len) case IPROTO_EXPR: request->key = value; request->key_end = data; + case IPROTO_DEF_TUPLE: + request->extra_tuple = value; + request->extra_tuple_end = data; default: break; } diff --git a/src/box/request.h b/src/box/request.h index d358bcc01922f6292b740810acfd07fde488c6ed..ec0a83e237e6860909ae9ff5e523d12aba94f49c 100644 --- a/src/box/request.h +++ b/src/box/request.h @@ -58,6 +58,9 @@ struct request /** Insert/replace tuple or proc argument or update operations. */ const char *tuple; const char *tuple_end; + /** Additional tuple of request, currently used by UPSERT */ + const char *extra_tuple; + const char *extra_tuple_end; /** Base field offset for UPDATE, e.g. 0 for C and 1 for Lua. */ int index_base; }; diff --git a/src/box/tuple.cc b/src/box/tuple.cc index 7db02b37cc6ce475e53442de7637b6dd2a2b1d14..58dc03718e3647cd7a1cc2f896f9dd243d7248a8 100644 --- a/src/box/tuple.cc +++ b/src/box/tuple.cc @@ -424,6 +424,41 @@ tuple_update(struct tuple_format *format, return new_tuple; } +struct tuple * +tuple_upsert(struct tuple_format *format, + void *(*region_alloc)(void *, size_t), void *alloc_ctx, + const struct tuple *old_tuple, + const char *plan_b_tuple_data, const char *plan_b_tuple_data_end, + const char *expr, const char *expr_end, int field_base) +{ + uint32_t new_size = 0; + const char *old_data, *old_data_end; + if (old_tuple) { + old_data = old_tuple->data; + old_data_end = old_tuple->data + old_tuple->bsize; + } else { + old_data = plan_b_tuple_data; + old_data_end = plan_b_tuple_data_end; + } + const char *new_data = + tuple_upsert_execute(region_alloc, alloc_ctx, expr, expr_end, + old_data, old_data_end, + &new_size, field_base); + + /* Allocate a new tuple. */ + assert(mp_typeof(*new_data) == MP_ARRAY); + struct tuple *new_tuple = tuple_new(format, new_data, + new_data + new_size); + + try { + tuple_init_field_map(format, new_tuple, (uint32_t *)new_tuple); + } catch (Exception *e) { + tuple_delete(new_tuple); + throw; + } + return new_tuple; +} + struct tuple * tuple_new(struct tuple_format *format, const char *data, const char *end) { diff --git a/src/box/tuple.h b/src/box/tuple.h index 20f74204f8674ca5fe42eff58e8b1428ae387a6b..a9eeec8f159871403bf0d63118bbcbbfb283f3ce 100644 --- a/src/box/tuple.h +++ b/src/box/tuple.h @@ -231,12 +231,29 @@ tuple_unref(struct tuple *tuple) /** Make tuple references exception-friendly in absence of @finally. */ struct TupleGuard { struct tuple *tuple; - TupleGuard(struct tuple *arg) :tuple(arg) { tuple_ref(tuple); } + TupleGuard(struct tuple *arg) : tuple(arg) { tuple_ref(tuple); } ~TupleGuard() { tuple_unref(tuple); } TupleGuard(const TupleGuard&) = delete; void operator=(const TupleGuard&) = delete; }; +/** Same as TupleGuard, but accepts normally NULL pointers */ +struct TupleGuardSafe { + struct tuple *tuple; + TupleGuardSafe(struct tuple *arg) : tuple(arg) + { + if (tuple) + tuple_ref(tuple); + } + ~TupleGuardSafe() + { + if (tuple) + tuple_unref(tuple); + } + TupleGuardSafe(const TupleGuardSafe&) = delete; + void operator=(const TupleGuardSafe&) = delete; +}; + /** * @brief Return a tuple format instance * @param tuple tuple @@ -461,6 +478,13 @@ tuple_update(struct tuple_format *new_format, const struct tuple *old_tuple, const char *expr, const char *expr_end, int field_base); +struct tuple * +tuple_upsert(struct tuple_format *new_format, + void *(*region_alloc)(void *, size_t), void *alloc_ctx, + const struct tuple *old_tuple, + const char *plan_b_tuple_data, const char *plan_b_tuple_data_end, + const char *expr, const char *expr_end, int field_base); + /** * @brief Compare two tuple fields using using field type definition * @param field_a field diff --git a/src/box/tuple_update.cc b/src/box/tuple_update.cc index 6020fabeff4e46adb26d493fa24c3e7f68aecc86..d50aa2c15f830d60447edb6475ed1bfcafa346f1 100644 --- a/src/box/tuple_update.cc +++ b/src/box/tuple_update.cc @@ -99,12 +99,17 @@ struct tuple_update int index_base; /* 0 for C and 1 for Lua */ }; -/** Argument of SET operation. */ +/** Argument of SET (and INSERT) operation. */ struct op_set_arg { uint32_t length; const char *value; }; +/** Argument of DELETE operation. */ +struct op_del_arg { + uint32_t count; +}; + /** * MsgPack format code of an arithmetic argument or result. * MsgPack codes are not used to simplify type calculation. @@ -165,6 +170,7 @@ struct op_splice_arg { union update_op_arg { struct op_set_arg set; + struct op_del_arg del; struct op_arith_arg arith; struct op_bit_arg bit; struct op_splice_arg splice; @@ -173,12 +179,14 @@ union update_op_arg { struct update_field; struct update_op; -typedef void (*do_op_func)(struct tuple_update *update, struct update_op *op, - const char **expr); +typedef void (*do_op_func)(struct tuple_update *update, struct update_op *op); +typedef void (*read_arg_func)(struct tuple_update *update, struct update_op *op, + const char **expr); typedef void (*store_op_func)(union update_op_arg *arg, const char *in, char *out); /** A set of functions and properties to initialize and do an op. */ struct update_op_meta { + read_arg_func read_arg; do_op_func do_op; store_op_func store; /* Argument count */ @@ -224,6 +232,8 @@ update_field_init(struct update_field *field, field->tail_len = tail_len; } +/* {{{ read_arg helpers */ + /** Read a field index or any other integer field. */ static inline int64_t mp_read_int(struct tuple_update *update, struct update_op *op, @@ -281,6 +291,92 @@ mp_read_arith_arg(struct tuple_update *update, struct update_op *op, return result; } +static inline const char * +mp_read_str(struct tuple_update *update, struct update_op *op, + const char **expr, uint32_t *len) +{ + if (mp_typeof(**expr) != MP_STR) { + tnt_raise(ClientError, ER_ARG_TYPE, (char) op->opcode, + update->index_base + op->field_no, "STR"); + } + return mp_decode_str(expr, len); /* value */ +} + +/* }}} read_arg helpers */ + +/* {{{ read_arg */ + +static void +read_arg_set(struct tuple_update *update, struct update_op *op, + const char **expr) +{ + (void)update; + op->arg.set.value = *expr; + mp_next(expr); + op->arg.set.length = (uint32_t) (*expr - op->arg.set.value); +} + +static void +read_arg_insert(struct tuple_update *update, struct update_op *op, + const char **expr) +{ + read_arg_set(update, op, expr); +} + +static void +read_arg_delete(struct tuple_update *update, struct update_op *op, + const char **expr) +{ + op->arg.del.count = (uint32_t) mp_read_int(update, op, expr); +} + +static void +read_arg_arith(struct tuple_update *update, struct update_op *op, + const char **expr) +{ + op->arg.arith = mp_read_arith_arg(update, op, expr); +} + +static void +read_arg_bit(struct tuple_update *update, struct update_op *op, + const char **expr) +{ + struct op_bit_arg *arg = &op->arg.bit; + arg->val = mp_read_uint(update, op, expr); +} + +static void +read_arg_splice(struct tuple_update *update, struct update_op *op, + const char **expr) +{ + struct op_splice_arg *arg = &op->arg.splice; + arg->offset = mp_read_int(update, op, expr); + arg->cut_length = mp_read_int(update, op, expr); /* cut length */ + arg->paste = mp_read_str(update, op, expr, &arg->paste_length); /* value */ +} + +/* }}} read_arg */ + +/* {{{ do_op helpers */ + +static inline void +op_adjust_field_no(struct tuple_update *update, struct update_op *op, + int32_t field_max) +{ + if (op->field_no >= 0) { + if (op->field_no < field_max) + return; + tnt_raise(ClientError, ER_NO_SUCH_FIELD, update->index_base + + op->field_no); + } else { + if (op->field_no + field_max >= 0) { + op->field_no += field_max; + return; + } + tnt_raise(ClientError, ER_NO_SUCH_FIELD, op->field_no); + } +} + static inline double cast_arith_arg_to_double(struct op_arith_arg arg) { @@ -319,92 +415,6 @@ mp_sizeof_op_arith_arg(struct op_arith_arg arg) } } -static inline const char * -mp_read_str(struct tuple_update *update, struct update_op *op, - const char **expr, uint32_t *len) -{ - if (mp_typeof(**expr) != MP_STR) { - tnt_raise(ClientError, ER_ARG_TYPE, (char) op->opcode, - update->index_base + op->field_no, "STR"); - } - return mp_decode_str(expr, len); /* value */ -} - -static inline void -op_adjust_field_no(struct tuple_update *update, struct update_op *op, - int32_t field_max) -{ - if (op->field_no >= 0) { - if (op->field_no < field_max) - return; - tnt_raise(ClientError, ER_NO_SUCH_FIELD, update->index_base + - op->field_no); - } else { - if (op->field_no + field_max >= 0) { - op->field_no += field_max; - return; - } - tnt_raise(ClientError, ER_NO_SUCH_FIELD, op->field_no); - } -} - -static inline void -op_set_read(struct update_op *op, const char **expr) -{ - op->arg.set.value = *expr; - mp_next(expr); - op->arg.set.length = *expr - op->arg.set.value; -} - -/* {{{ do_op */ - -static void -do_op_insert(struct tuple_update *update, struct update_op *op, - const char **expr) -{ - op_adjust_field_no(update, op, rope_size(update->rope) + 1); - op_set_read(op, expr); - struct update_field *field = (struct update_field *) - update->alloc(update->alloc_ctx, sizeof(*field)); - update_field_init(field, op->arg.set.value, op->arg.set.length, 0); - rope_insert(update->rope, op->field_no, field, 1); -} - -static void -do_op_set(struct tuple_update *update, struct update_op *op, - const char **expr) -{ - /* intepret '=' for n +1 field as insert */ - if (op->field_no == rope_size(update->rope)) - return do_op_insert(update, op, expr); - op_adjust_field_no(update, op, rope_size(update->rope)); - struct update_field *field = (struct update_field *) - rope_extract(update->rope, op->field_no); - /* Ignore the previous op, if any. */ - field->op = op; - op_set_read(op, expr); - op->new_field_len = op->arg.set.length; -} - -static void -do_op_delete(struct tuple_update *update, struct update_op *op, - const char **expr) -{ - op_adjust_field_no(update, op, rope_size(update->rope)); - uint32_t delete_count = mp_read_int(update, op, expr); - - if ((uint64_t) op->field_no + delete_count > rope_size(update->rope)) - delete_count = rope_size(update->rope) - op->field_no; - - if (delete_count == 0) { - tnt_raise(ClientError, ER_UPDATE_FIELD, update->index_base + - op->field_no, "cannot delete 0 fields"); - } - - for (uint32_t u = 0; u < delete_count; u++) - rope_erase(update->rope, op->field_no); -} - static inline struct op_arith_arg make_arith_operation(struct op_arith_arg arg1, struct op_arith_arg arg2, char opcode, uint32_t err_fieldno) @@ -446,7 +456,7 @@ make_arith_operation(struct op_arith_arg arg1, struct op_arith_arg arg2, case '-': c = a - b; break; default: tnt_raise(ClientError, ER_ARG_TYPE, (char ) opcode, - err_fieldno, "positive integer"); + err_fieldno, "positive integer"); break; } if (lowest_type == AT_DOUBLE) { @@ -463,51 +473,88 @@ make_arith_operation(struct op_arith_arg arg1, struct op_arith_arg arg2, return result; } +/* }}} do_op helpers */ + +/* {{{ do_op */ + static void -prepare_op_arith(struct tuple_update *update, struct update_op *op, - const char **expr, struct op_arith_arg *left) +do_op_insert(struct tuple_update *update, struct update_op *op) { - op_adjust_field_no(update, op, rope_size(update->rope)); + op_adjust_field_no(update, op, rope_size(update->rope) + 1); + struct update_field *field = (struct update_field *) + update->alloc(update->alloc_ctx, sizeof(*field)); + update_field_init(field, op->arg.set.value, op->arg.set.length, 0); + rope_insert(update->rope, op->field_no, field, 1); +} +static void +do_op_set(struct tuple_update *update, struct update_op *op) +{ + /* intepret '=' for n +1 field as insert */ + if (op->field_no == rope_size(update->rope)) + return do_op_insert(update, op); + op_adjust_field_no(update, op, rope_size(update->rope)); struct update_field *field = (struct update_field *) - rope_extract(update->rope, op->field_no); - if (field->op) { - tnt_raise(ClientError, ER_UPDATE_FIELD, update->index_base + - op->field_no, "double update of the same field"); - } + rope_extract(update->rope, op->field_no); + /* Ignore the previous op, if any. */ field->op = op; - const char *old = field->old; - *left = mp_read_arith_arg(update, op, &old); - op->arg.arith = mp_read_arith_arg(update, op, expr); + op->new_field_len = op->arg.set.length; } static void -do_op_arith(struct tuple_update *update, struct update_op *op, - const char **expr) +do_op_delete(struct tuple_update *update, struct update_op *op) { - struct op_arith_arg left; - prepare_op_arith(update, op, expr, &left); - op->arg.arith = make_arith_operation(left, op->arg.arith, op->opcode, + op_adjust_field_no(update, op, rope_size(update->rope)); + uint32_t delete_count = op->arg.del.count; + + if ((uint64_t) op->field_no + delete_count > rope_size(update->rope)) + delete_count = rope_size(update->rope) - op->field_no; + + if (delete_count == 0) { + tnt_raise(ClientError, ER_UPDATE_FIELD, + update->index_base + op->field_no, + "cannot delete 0 fields"); + } + + for (uint32_t u = 0; u < delete_count; u++) + rope_erase(update->rope, op->field_no); +} + +static void +do_op_arith(struct tuple_update *update, struct update_op *op) +{ + op_adjust_field_no(update, op, rope_size(update->rope)); + + struct update_field *field = (struct update_field *) + rope_extract(update->rope, op->field_no); + if (field->op) { + tnt_raise(ClientError, ER_UPDATE_FIELD, + update->index_base + op->field_no, + "double update of the same field"); + } + const char *old = field->old; + struct op_arith_arg left_arg = mp_read_arith_arg(update, op, &old); + + struct op_arith_arg right_arg = op->arg.arith; + op->arg.arith = make_arith_operation(left_arg, right_arg, op->opcode, update->index_base + op->field_no); + field->op = op; op->new_field_len = mp_sizeof_op_arith_arg(op->arg.arith); } static void -do_op_bit(struct tuple_update *update, struct update_op *op, - const char **expr) +do_op_bit(struct tuple_update *update, struct update_op *op) { op_adjust_field_no(update, op, rope_size(update->rope)); struct update_field *field = (struct update_field *) - rope_extract(update->rope, op->field_no); + rope_extract(update->rope, op->field_no); struct op_bit_arg *arg = &op->arg.bit; - arg->val = mp_read_uint(update, op, expr); if (field->op) { tnt_raise(ClientError, ER_UPDATE_FIELD, - update->index_base + op->field_no, - "double update of the same field"); + update->index_base + op->field_no, + "double update of the same field"); } - field->op = op; const char *old = field->old; uint64_t val = mp_read_uint(update, op, &old); switch (op->opcode) { @@ -523,16 +570,16 @@ do_op_bit(struct tuple_update *update, struct update_op *op, default: assert(false); /* checked by update_read_ops */ } + field->op = op; op->new_field_len = mp_sizeof_uint(arg->val); } static void -do_op_splice(struct tuple_update *update, struct update_op *op, - const char **expr) +do_op_splice(struct tuple_update *update, struct update_op *op) { op_adjust_field_no(update, op, rope_size(update->rope)); struct update_field *field = (struct update_field *) - rope_extract(update->rope, op->field_no); + rope_extract(update->rope, op->field_no); if (field->op) { tnt_raise(ClientError, ER_UPDATE_FIELD, update->index_base + op->field_no, @@ -541,19 +588,15 @@ do_op_splice(struct tuple_update *update, struct update_op *op, struct op_splice_arg *arg = &op->arg.splice; - arg->offset = mp_read_int(update, op, expr); - arg->cut_length = mp_read_int(update, op, expr); /* cut length */ - - arg->paste = mp_read_str(update, op, expr, &arg->paste_length); /* value */ - const char *in = field->old; uint32_t str_len; in = mp_read_str(update, op, &in, &str_len); if (arg->offset < 0) { if (-arg->offset > str_len + 1) { - tnt_raise(ClientError, ER_SPLICE, update->index_base + - op->field_no, "offset is out of bound"); + tnt_raise(ClientError, ER_SPLICE, + update->index_base + op->field_no, + "offset is out of bound"); } arg->offset = arg->offset + str_len + 1; } else if (arg->offset - update->index_base >= 0) { @@ -561,8 +604,9 @@ do_op_splice(struct tuple_update *update, struct update_op *op, if (arg->offset > str_len) arg->offset = str_len; } else /* (offset <= 0) */ { - tnt_raise(ClientError, ER_SPLICE, update->index_base + - op->field_no, "offset is out of bound"); + tnt_raise(ClientError, ER_SPLICE, + update->index_base + op->field_no, + "offset is out of bound"); } assert(arg->offset >= 0 && arg->offset <= str_len); @@ -583,27 +627,34 @@ do_op_splice(struct tuple_update *update, struct update_op *op, arg->tail_length = str_len - arg->tail_offset; + field->op = op; /* Record the new field length (maximal). */ op->new_field_len = mp_sizeof_str(arg->offset + arg->paste_length + arg->tail_length); - field->op = op; } -/* }}} */ +/* }}} do_op */ /* {{{ store_op */ static void -store_op_set(struct op_set_arg *arg, const char *in __attribute__((unused)), - char *out) +store_op_set(struct op_set_arg *arg, const char *in, char *out) { + (void)in; memcpy(out, arg->value, arg->length); } static void -store_op_arith(struct op_arith_arg *arg, - const char *in __attribute__((unused)), char *out) +store_op_insert(struct op_set_arg *arg, const char *in, char *out) { + (void)in; + memcpy(out, arg->value, arg->length); +} + +static void +store_op_arith(struct op_arith_arg *arg, const char *in, char *out) +{ + (void)in; if (arg->type == AT_INT) { if (int96_is_uint64(&arg->int96)) { mp_encode_uint(out, int96_extract_uint64(&arg->int96)); @@ -620,8 +671,9 @@ store_op_arith(struct op_arith_arg *arg, } static void -store_op_bit(struct op_bit_arg *arg, const char *in __attribute__((unused)), char *out) +store_op_bit(struct op_bit_arg *arg, const char *in, char *out) { + (void)in; mp_encode_uint(out, arg->val); } @@ -641,36 +693,28 @@ store_op_splice(struct op_splice_arg *arg, const char *in, char *out) memcpy(out, in + arg->tail_offset, arg->tail_length); /* copy tail */ } -static void -store_op_insert(struct op_set_arg *arg, - const char *in __attribute__((unused)), - char *out) -{ - memcpy(out, arg->value, arg->length); -} +/* }}} store_op */ static const struct update_op_meta op_set = - { do_op_set, (store_op_func) store_op_set, 3 }; + { read_arg_set, do_op_set, (store_op_func) store_op_set, 3 }; static const struct update_op_meta op_insert = - { do_op_insert, (store_op_func) store_op_insert, 3 }; + { read_arg_insert, do_op_insert, (store_op_func) store_op_insert, 3 }; static const struct update_op_meta op_arith = - { do_op_arith, (store_op_func) store_op_arith, 3 }; + { read_arg_arith, do_op_arith, (store_op_func) store_op_arith, 3 }; static const struct update_op_meta op_bit = - { do_op_bit, (store_op_func) store_op_bit, 3 }; + { read_arg_bit, do_op_bit, (store_op_func) store_op_bit, 3 }; static const struct update_op_meta op_splice = - { do_op_splice, (store_op_func) store_op_splice, 5 }; + { read_arg_splice, do_op_splice, (store_op_func) store_op_splice, 5 }; static const struct update_op_meta op_delete = - { do_op_delete, (store_op_func) NULL, 3 }; - -/* }}} */ + { read_arg_delete, do_op_delete, (store_op_func) NULL, 3 }; /** Split a range of fields in two, allocating update_field * context for the new range. */ static void * -update_field_split(void *split_ctx, void *data, size_t size __attribute__((unused)), - size_t offset) +update_field_split(void *split_ctx, void *data, size_t size, size_t offset) { + (void)size; struct tuple_update *update = (struct tuple_update *) split_ctx; struct update_field *prev = (struct update_field *) data; @@ -815,9 +859,11 @@ update_read_ops(struct tuple_update *update, const char *expr, uint32_t args, len; args = mp_decode_array(&expr); if (args < 1) - tnt_raise(ClientError, ER_INVALID_MSGPACK, "expected an update operation (array)"); + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "expected an update operation (array)"); if (mp_typeof(*expr) != MP_STR) - tnt_raise(ClientError, ER_INVALID_MSGPACK, "expected an update operation name (string)"); + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "expected an update operation name (string)"); op->opcode = *mp_decode_str(&expr, &len); @@ -856,7 +902,7 @@ update_read_ops(struct tuple_update *update, const char *expr, } else { tnt_raise(ClientError, ER_NO_SUCH_FIELD, field_no); } - op->meta->do_op(update, op, &expr); + op->meta->read_arg(update, op, &expr); } /* Check the remainder length, the request must be fully read. */ @@ -864,15 +910,38 @@ update_read_ops(struct tuple_update *update, const char *expr, tnt_raise(IllegalParams, "can't unpack update operations"); } -const char * -tuple_update_execute(region_alloc_func alloc, void *alloc_ctx, - const char *expr,const char *expr_end, - const char *old_data, const char *old_data_end, - uint32_t *p_tuple_len, int index_base) +static void +update_do_ops(struct tuple_update *update) +{ + struct update_op *op = update->ops; + struct update_op *ops_end = op + update->op_count; + for (; op < ops_end; op++) { + op->meta->do_op(update, op); + } +} + +static void +upsert_do_ops(struct tuple_update *update) +{ + struct update_op *op = update->ops; + struct update_op *ops_end = op + update->op_count; + for (; op < ops_end; op++) { + try { + op->meta->do_op(update, op); + } catch(ClientError *e) { + say_error("The following error occured during UPSERT " + "operation:"); + e->log(); + } + } +} + +static void +update_init(struct tuple_update *update, + region_alloc_func alloc, void *alloc_ctx, + const char *old_data, const char *old_data_end, + int index_base) { - struct tuple_update *update = (struct tuple_update *) - alloc(alloc_ctx, sizeof(*update)); - assert(update != NULL); memset(update, 0, sizeof(*update)); update->alloc = alloc; update->alloc_ctx = alloc_ctx; @@ -883,12 +952,46 @@ tuple_update_execute(region_alloc_func alloc, void *alloc_ctx, update->index_base = index_base; update_create_rope(update, old_data, old_data_end); - update_read_ops(update, expr, expr_end); +} + +const char * +update_finish(struct tuple_update *update, uint32_t *p_tuple_len) +{ uint32_t tuple_len = update_calc_tuple_length(update); + char *buffer = (char *) update->alloc(update->alloc_ctx, tuple_len); + *p_tuple_len = update_write_tuple(update, buffer, buffer + tuple_len); + return buffer; +} - char *buffer = (char *) alloc(alloc_ctx, tuple_len); +const char * +tuple_update_execute(region_alloc_func alloc, void *alloc_ctx, + const char *expr,const char *expr_end, + const char *old_data, const char *old_data_end, + uint32_t *p_tuple_len, int index_base) +{ + struct tuple_update update; + update_init(&update, alloc, alloc_ctx, old_data, old_data_end, + index_base); - *p_tuple_len = update_write_tuple(update, buffer, buffer + tuple_len); + update_read_ops(&update, expr, expr_end); + update_do_ops(&update); - return buffer; + return update_finish(&update, p_tuple_len); +} + +const char * +tuple_upsert_execute(region_alloc_func alloc, void *alloc_ctx, + const char *expr,const char *expr_end, + const char *old_data, const char *old_data_end, + uint32_t *p_tuple_len, int index_base) +{ + struct tuple_update update; + update_init(&update, alloc, alloc_ctx, old_data, old_data_end, + index_base); + + update_read_ops(&update, expr, expr_end); + upsert_do_ops(&update); + + return update_finish(&update, p_tuple_len); } + diff --git a/src/box/tuple_update.h b/src/box/tuple_update.h index c6873f1f2adbad31acfdc595b3003fbe0811af5a..e80d2b00a4d56d3852d63c11c1d2519a85c42514 100644 --- a/src/box/tuple_update.h +++ b/src/box/tuple_update.h @@ -45,4 +45,11 @@ tuple_update_execute(region_alloc_func alloc, void *alloc_ctx, const char *old_data, const char *old_data_end, uint32_t *p_new_size, int index_base); +const char * +tuple_upsert_execute(region_alloc_func alloc, void *alloc_ctx, + const char *expr,const char *expr_end, + const char *old_data, const char *old_data_end, + uint32_t *p_new_size, int index_base); + + #endif /* TARANTOOL_BOX_TUPLE_UPDATE_H_INCLUDED */ diff --git a/test/box/misc.result b/test/box/misc.result index 2dd1329d52d3bdf025a153863ce47d15b26837d7..f1189711d0fb2c428b15b7157c696557391ee042 100644 --- a/test/box/misc.result +++ b/test/box/misc.result @@ -136,12 +136,13 @@ end; t; --- - - DELETE - - EVAL - SELECT - - REPLACE - INSERT - - AUTH + - EVAL - CALL + - REPLACE + - UPSERT + - AUTH - UPDATE - total - rps diff --git a/test/box/update.result b/test/box/update.result index 934c3c9ae4f7e3502b1ccc6668396390485e4d20..576f6f57e9e7a09ed603868988a9a6b7d894509f 100644 --- a/test/box/update.result +++ b/test/box/update.result @@ -454,7 +454,7 @@ s:insert{1, 2, 3} ... s:update({1}) --- -- error: Tuple/Key must be MsgPack array +- error: Usage space:update(key, ops) ... s:update({1}, {'=', 1, 1}) --- @@ -760,6 +760,111 @@ s:update({0}, {{'+', 2, -0x4000000000000001ll}}) -- overflow --- - error: Integer overflow when performing '+' operation on field 2 ... +--UPSERT https://github.com/tarantool/tarantool/issues/905 +s:delete{0} +--- +- [0, -4611686018427387904] +... +s:upsert({0}, {{'+', 2, 2}}) -- wrong usage +--- +- error: Usage space:upsert(key, ops, def_tuple) +... +s:upsert({0}, {{'+', 2, 2}}, {0, 0}) +--- +- [0, 2] +... +s:delete{0} +--- +- [0, 2] +... +s:upsert({0}, {{'+', 2, 2}}, {0, 0, 0}) +--- +- [0, 2, 0] +... +s:delete{0} +--- +- [0, 2, 0] +... +s:upsert({0}, {{'+', 2, 2}}, {0}) +--- +- [0] +... +s:replace{0, 1, 2, 4} +--- +- [0, 1, 2, 4] +... +s:upsert({0}, {{'+', 2, 2}}, {0, 0, "you will not see it"}) +--- +- [0, 3, 2, 4] +... +s:replace{0, -0x4000000000000000ll} +--- +- [0, -4611686018427387904] +... +s:upsert({0}, {{'+', 2, -0x4000000000000001ll}}, {0}) -- overflow +--- +- [0, -4611686018427387904] +... +s:replace{0, "thing"} +--- +- [0, 'thing'] +... +s:upsert({0}, {{'+', 2, 2}}, {0, "nothing"}) +--- +- [0, 'thing'] +... +s:delete{0} +--- +- [0, 'thing'] +... +s:upsert({0}, {{'+', 2, 2}}, {0, "thing"}) +--- +- [0, 'thing'] +... +s:replace{0, 1, 2} +--- +- [0, 1, 2] +... +s:upsert({0}, {{'!', 42, 42}}, {0}) +--- +- [0, 1, 2] +... +s:upsert({0}, {{'#', 42, 42}}, {0}) +--- +- [0, 1, 2] +... +s:upsert({0}, {{'=', 42, 42}}, {0}) +--- +- [0, 1, 2] +... +s:replace{0, 1.5} +--- +- [0, 1.5] +... +s:upsert({0}, {{'|', 1, 255}}, {0}) +--- +- [0, 1.5] +... +s:replace{0, 1.5} +--- +- [0, 1.5] +... +s:replace{0, 'something to splice'} +--- +- [0, 'something to splice'] +... +s:upsert({0}, {{':', 2, 1, 4, 'no'}}, {0}) +--- +- [0, 'nothing to splice'] +... +s:upsert({0}, {{':', 2, 1, 2, 'every'}}, {0}) +--- +- [0, 'everything to splice'] +... +s:upsert({0}, {{':', 2, -100, 2, 'every'}}, {0}) +--- +- [0, 'everything to splice'] +... s:drop() --- ... diff --git a/test/box/update.test.lua b/test/box/update.test.lua index 1f6c545334a79b718be0e162b7e40b83f9d4e83e..e6373bb02c5f6f4229d2f9c082fe54190d95dbf6 100644 --- a/test/box/update.test.lua +++ b/test/box/update.test.lua @@ -233,5 +233,33 @@ s:update({0}, {{'+', 2, -0x4000000000000000ll}}) -- ok s:replace{0, -0x4000000000000000ll} s:update({0}, {{'+', 2, -0x4000000000000001ll}}) -- overflow +--UPSERT https://github.com/tarantool/tarantool/issues/905 +s:delete{0} +s:upsert({0}, {{'+', 2, 2}}) -- wrong usage +s:upsert({0}, {{'+', 2, 2}}, {0, 0}) +s:delete{0} +s:upsert({0}, {{'+', 2, 2}}, {0, 0, 0}) +s:delete{0} +s:upsert({0}, {{'+', 2, 2}}, {0}) +s:replace{0, 1, 2, 4} +s:upsert({0}, {{'+', 2, 2}}, {0, 0, "you will not see it"}) +s:replace{0, -0x4000000000000000ll} +s:upsert({0}, {{'+', 2, -0x4000000000000001ll}}, {0}) -- overflow +s:replace{0, "thing"} +s:upsert({0}, {{'+', 2, 2}}, {0, "nothing"}) +s:delete{0} +s:upsert({0}, {{'+', 2, 2}}, {0, "thing"}) +s:replace{0, 1, 2} +s:upsert({0}, {{'!', 42, 42}}, {0}) +s:upsert({0}, {{'#', 42, 42}}, {0}) +s:upsert({0}, {{'=', 42, 42}}, {0}) +s:replace{0, 1.5} +s:upsert({0}, {{'|', 1, 255}}, {0}) +s:replace{0, 1.5} +s:replace{0, 'something to splice'} +s:upsert({0}, {{':', 2, 1, 4, 'no'}}, {0}) +s:upsert({0}, {{':', 2, 1, 2, 'every'}}, {0}) +s:upsert({0}, {{':', 2, -100, 2, 'every'}}, {0}) + s:drop()