Skip to content
Snippets Groups Projects
Commit ac9d9b6b authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

A fix and a test case for Bug#730593

A fix and a test case for
https://bugs.launchpad.net/tarantool/+bug/730593
"Bad data if incomplete tuple"

A partial tuple could get accepted by INSERT
command, and later on show up in all search results
on the index for which this partial tuple was missing
keys.

When inserting into a TREE index Tarantool/Box did not
validate that the tuple which is being inserted actually
has fields used in the index, and would insert instead
an "ASTERISK" (wildcard) value into the index.

Additionally, when such validation was added, it turned
out that box_raise leaves a corrupt tuple in place
since txn_abort() doesn't remove the unless it's marked
as "GHOST", and tuples were marked as "GHOST" after
index insert, not before.
parent f7aef6a1
No related branches found
No related tags found
No related merge requests found
...@@ -268,17 +268,25 @@ prepare_replace(struct box_txn *txn, size_t cardinality, struct tbuf *data) ...@@ -268,17 +268,25 @@ prepare_replace(struct box_txn *txn, size_t cardinality, struct tbuf *data)
#endif #endif
lock_tuple(txn, txn->old_tuple); lock_tuple(txn, txn->old_tuple);
} else { } else {
lock_tuple(txn, txn->tuple);
/* /*
* if tuple doesn't exist insert GHOST tuple in indeces * Mark the tuple as ghost before attempting an
* in order to avoid race condition * index replace: if it fails, txn_abort() will
* ref count will be incr in commit * look at the flag and remove the tuple.
*/
txn->tuple->flags |= GHOST;
/*
* If the tuple doesn't exist, insert a GHOST
* tuple in all indices in order to avoid a race
* condition when another INSERT comes along:
* a concurrent INSERT, UPDATE, or DELETE, returns
* an error when meets a ghost tuple.
*
* Tuple reference counter will be incremented in
* txn_commit().
*/ */
foreach_index(txn->n, index) foreach_index(txn->n, index)
index->replace(index, NULL, txn->tuple); index->replace(index, NULL, txn->tuple);
lock_tuple(txn, txn->tuple);
txn->tuple->flags |= GHOST;
} }
return -1; return -1;
......
...@@ -101,7 +101,7 @@ field_compare(struct field *f1, struct field *f2, enum field_data_type type) ...@@ -101,7 +101,7 @@ field_compare(struct field *f1, struct field *f2, enum field_data_type type)
return -1; return -1;
} }
panic("imposible happend"); panic("impossible happened");
} }
...@@ -347,6 +347,9 @@ index_replace_hash_num(struct index *self, struct box_tuple *old_tuple, struct b ...@@ -347,6 +347,9 @@ index_replace_hash_num(struct index *self, struct box_tuple *old_tuple, struct b
u32 key_size = load_varint32(&key); u32 key_size = load_varint32(&key);
u32 num = *(u32 *)key; u32 num = *(u32 *)key;
if (key_size != 4)
box_raise(ERR_CODE_ILLEGAL_PARAMS, "key is not u32");
if (old_tuple != NULL) { if (old_tuple != NULL) {
void *old_key = tuple_field(old_tuple, self->key_field->fieldno); void *old_key = tuple_field(old_tuple, self->key_field->fieldno);
load_varint32(&old_key); load_varint32(&old_key);
...@@ -354,8 +357,6 @@ index_replace_hash_num(struct index *self, struct box_tuple *old_tuple, struct b ...@@ -354,8 +357,6 @@ index_replace_hash_num(struct index *self, struct box_tuple *old_tuple, struct b
assoc_delete(int_ptr_map, self->idx.int_hash, old_num); assoc_delete(int_ptr_map, self->idx.int_hash, old_num);
} }
if (key_size != 4)
box_raise(ERR_CODE_ILLEGAL_PARAMS, "key is not u32");
assoc_replace(int_ptr_map, self->idx.int_hash, num, tuple); assoc_replace(int_ptr_map, self->idx.int_hash, num, tuple);
#ifdef DEBUG #ifdef DEBUG
say_debug("index_replace_hash_num(self:%p, old_tuple:%p, tuple:%p) key:%i", self, old_tuple, say_debug("index_replace_hash_num(self:%p, old_tuple:%p, tuple:%p) key:%i", self, old_tuple,
...@@ -370,6 +371,9 @@ index_replace_hash_num64(struct index *self, struct box_tuple *old_tuple, struct ...@@ -370,6 +371,9 @@ index_replace_hash_num64(struct index *self, struct box_tuple *old_tuple, struct
u32 key_size = load_varint32(&key); u32 key_size = load_varint32(&key);
u64 num = *(u64 *)key; u64 num = *(u64 *)key;
if (key_size != 8)
box_raise(ERR_CODE_ILLEGAL_PARAMS, "key is not u64");
if (old_tuple != NULL) { if (old_tuple != NULL) {
void *old_key = tuple_field(old_tuple, self->key_field->fieldno); void *old_key = tuple_field(old_tuple, self->key_field->fieldno);
load_varint32(&old_key); load_varint32(&old_key);
...@@ -377,8 +381,6 @@ index_replace_hash_num64(struct index *self, struct box_tuple *old_tuple, struct ...@@ -377,8 +381,6 @@ index_replace_hash_num64(struct index *self, struct box_tuple *old_tuple, struct
assoc_delete(int64_ptr_map, self->idx.int64_hash, old_num); assoc_delete(int64_ptr_map, self->idx.int64_hash, old_num);
} }
if (key_size != 8)
box_raise(ERR_CODE_ILLEGAL_PARAMS, "key is not u64");
assoc_replace(int64_ptr_map, self->idx.int64_hash, num, tuple); assoc_replace(int64_ptr_map, self->idx.int64_hash, num, tuple);
#ifdef DEBUG #ifdef DEBUG
say_debug("index_replace_hash_num(self:%p, old_tuple:%p, tuple:%p) key:%"PRIu64, self, old_tuple, say_debug("index_replace_hash_num(self:%p, old_tuple:%p, tuple:%p) key:%"PRIu64, self, old_tuple,
...@@ -391,6 +393,9 @@ index_replace_hash_str(struct index *self, struct box_tuple *old_tuple, struct b ...@@ -391,6 +393,9 @@ index_replace_hash_str(struct index *self, struct box_tuple *old_tuple, struct b
{ {
void *key = tuple_field(tuple, self->key_field->fieldno); void *key = tuple_field(tuple, self->key_field->fieldno);
if (key == NULL)
box_raise(ERR_CODE_ILLEGAL_PARAMS, "Supplied tuple misses a field which is part of an index");
if (old_tuple != NULL) { if (old_tuple != NULL) {
void *old_key = tuple_field(old_tuple, self->key_field->fieldno); void *old_key = tuple_field(old_tuple, self->key_field->fieldno);
assoc_delete(lstr_ptr_map, self->idx.str_hash, old_key); assoc_delete(lstr_ptr_map, self->idx.str_hash, old_key);
...@@ -407,6 +412,9 @@ index_replace_hash_str(struct index *self, struct box_tuple *old_tuple, struct b ...@@ -407,6 +412,9 @@ index_replace_hash_str(struct index *self, struct box_tuple *old_tuple, struct b
static void static void
index_replace_tree_str(struct index *self, struct box_tuple *old_tuple, struct box_tuple *tuple) index_replace_tree_str(struct index *self, struct box_tuple *old_tuple, struct box_tuple *tuple)
{ {
if (tuple->cardinality < self->field_cmp_order_cnt)
box_raise(ERR_CODE_ILLEGAL_PARAMS, "Supplied tuple misses a field which is part of an index");
struct tree_index_member *member = tuple2tree_index_member(self, tuple, NULL); struct tree_index_member *member = tuple2tree_index_member(self, tuple, NULL);
if (old_tuple) if (old_tuple)
......
#
A test case for Bug#729758 # A test case for Bug#729758
"SELECT fails with a disjunct and small LIMIT" # "SELECT fails with a disjunct and small LIMIT"
https://bugs.launchpad.net/tarantool/+bug/729758 # https://bugs.launchpad.net/tarantool/+bug/729758
#
insert into t0 values ('Doe', 'Richard') insert into t0 values ('Doe', 'Richard')
Insert OK, 1 row affected Insert OK, 1 row affected
insert into t0 values ('Roe', 'Richard') insert into t0 values ('Roe', 'Richard')
...@@ -26,11 +26,11 @@ Found 5 tuples: ...@@ -26,11 +26,11 @@ Found 5 tuples:
['Woe', 'Richard'] ['Woe', 'Richard']
['Major', 'Tomas'] ['Major', 'Tomas']
['Kytes', 'Tomas'] ['Kytes', 'Tomas']
#
A test case for Bug#729879 # A test case for Bug#729879
"Zero limit is treated the same as no limit" # "Zero limit is treated the same as no limit"
https://bugs.launchpad.net/tarantool/+bug/729879 # https://bugs.launchpad.net/tarantool/+bug/729879
#
select * from t0 where k1='Richard' or k1='Tomas' limit 0 select * from t0 where k1='Richard' or k1='Tomas' limit 0
No match No match
delete from t0 where k0='Doe' delete from t0 where k0='Doe'
...@@ -49,3 +49,31 @@ delete from t0 where k0='Wales' ...@@ -49,3 +49,31 @@ delete from t0 where k0='Wales'
Delete OK, 1 row affected Delete OK, 1 row affected
delete from t0 where k0='Callaghan' delete from t0 where k0='Callaghan'
Delete OK, 1 row affected Delete OK, 1 row affected
#
# A test case for Bug#730593
# "Bad data if incomplete tuple"
# https://bugs.launchpad.net/tarantool/+bug/730593
# Verify that if there is an index on, say, field 2,
# we can't insert tuples with cardinality 1 and
# get away with it.
#
insert into t0 values ('Britney')
An error occurred: ERR_CODE_ILLEGAL_PARAMS, 'Illegal parameters'
select * from t0 where k1='Anything'
No match
insert into t0 values ('Stephanie')
An error occurred: ERR_CODE_ILLEGAL_PARAMS, 'Illegal parameters'
select * from t0 where k1='Anything'
No match
insert into t0 values ('Spears', 'Britney')
Insert OK, 1 row affected
select * from t0 where k0='Spears'
Found 1 tuple:
['Spears', 'Britney']
select * from t0 where k1='Anything'
No match
select * from t0 where k1='Britney'
Found 1 tuple:
['Spears', 'Britney']
delete from t0 where k0='Spears'
Delete OK, 1 row affected
# encoding: tarantool # encoding: tarantool
# #
print """ print """#
A test case for Bug#729758 # A test case for Bug#729758
"SELECT fails with a disjunct and small LIMIT" # "SELECT fails with a disjunct and small LIMIT"
https://bugs.launchpad.net/tarantool/+bug/729758 # https://bugs.launchpad.net/tarantool/+bug/729758
""" #"""
exec sql "insert into t0 values ('Doe', 'Richard')" exec sql "insert into t0 values ('Doe', 'Richard')"
exec sql "insert into t0 values ('Roe', 'Richard')" exec sql "insert into t0 values ('Roe', 'Richard')"
...@@ -16,11 +16,11 @@ exec sql "insert into t0 values ('Wales', 'Tomas')" ...@@ -16,11 +16,11 @@ exec sql "insert into t0 values ('Wales', 'Tomas')"
exec sql "insert into t0 values ('Callaghan', 'Tomas')" exec sql "insert into t0 values ('Callaghan', 'Tomas')"
exec sql "select * from t0 where k1='Richard' or k1='Tomas' or k1='Tomas' limit 5" exec sql "select * from t0 where k1='Richard' or k1='Tomas' or k1='Tomas' limit 5"
print """ print """#
A test case for Bug#729879 # A test case for Bug#729879
"Zero limit is treated the same as no limit" # "Zero limit is treated the same as no limit"
https://bugs.launchpad.net/tarantool/+bug/729879 # https://bugs.launchpad.net/tarantool/+bug/729879
""" #"""
exec sql "select * from t0 where k1='Richard' or k1='Tomas' limit 0" exec sql "select * from t0 where k1='Richard' or k1='Tomas' limit 0"
# Cleanup # Cleanup
...@@ -32,4 +32,22 @@ exec sql "delete from t0 where k0='Kytes'" ...@@ -32,4 +32,22 @@ exec sql "delete from t0 where k0='Kytes'"
exec sql "delete from t0 where k0='Stiles'" exec sql "delete from t0 where k0='Stiles'"
exec sql "delete from t0 where k0='Wales'" exec sql "delete from t0 where k0='Wales'"
exec sql "delete from t0 where k0='Callaghan'" exec sql "delete from t0 where k0='Callaghan'"
print """#
# A test case for Bug#730593
# "Bad data if incomplete tuple"
# https://bugs.launchpad.net/tarantool/+bug/730593
# Verify that if there is an index on, say, field 2,
# we can't insert tuples with cardinality 1 and
# get away with it.
#"""
exec sql "insert into t0 values ('Britney')"
exec sql "select * from t0 where k1='Anything'"
exec sql "insert into t0 values ('Stephanie')"
exec sql "select * from t0 where k1='Anything'"
exec sql "insert into t0 values ('Spears', 'Britney')"
exec sql "select * from t0 where k0='Spears'"
exec sql "select * from t0 where k1='Anything'"
exec sql "select * from t0 where k1='Britney'"
exec sql "delete from t0 where k0='Spears'"
# vim: syntax=python # vim: syntax=python
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment