diff --git a/mod/box/tree.m b/mod/box/tree.m index ac0fe6d80ebbcfade42145fd841d0fa333c04128..743bf1328710c0f3ffd6bbb3cc7930b2dcb25beb 100644 --- a/mod/box/tree.m +++ b/mod/box/tree.m @@ -271,6 +271,25 @@ find_tree_type(struct space *space, struct key_def *key_def) } } +/** + * Check if key parts make a linear sequence of fields. + */ +static bool +key_is_linear(struct key_def *key_def) +{ + if (key_def->part_count > 1) { + int prev = key_def->parts[0].fieldno; + for (int i = 1; i < key_def->part_count; ++i) { + int next = key_def->parts[i].fieldno; + if (next != (prev + 1)) { + return false; + } + prev = next; + } + } + return true; +} + /** * Find field offsets/values for a sparse node. */ @@ -489,13 +508,10 @@ sparse_key_node_compare(struct key_def *key_def, * Compare a part for two dense keys. */ static int -dense_part_compare(enum field_data_type type, const u8 *data_a, - u32 offset_a, const u8 *data_b, u32 offset_b) +dense_part_compare(enum field_data_type type, + const u8 *ad, u32 al, + const u8 *bd, u32 bl) { - const u8 *ad = data_a + offset_a; - const u8 *bd = data_b + offset_b; - u32 al = load_varint32((void *) &ad); - u32 bl = load_varint32((void *) &bd); if (type == NUM) { u32 an, bn; assert(al == sizeof an && bl == sizeof bn); @@ -552,14 +568,44 @@ dense_node_compare(struct key_def *key_def, u32 first_field, /* Compare key parts. */ for (int part = 0; part < part_count; ++part) { int field = key_def->parts[part].fieldno; + u8 *ad = tuple_a->data + off_a[field - first_field]; + u8 *bd = tuple_b->data + off_b[field - first_field]; + u32 al = load_varint32((void *) &ad); + u32 bl = load_varint32((void *) &bd); + int r = dense_part_compare(key_def->parts[part].type, + ad, al, bd, bl); + if (r) { + return r; + } + } + return 0; +} + +/** + * Compare a part for two dense keys with parts in linear order. + */ +static int +linear_node_compare(struct key_def *key_def, u32 first_field, + struct box_tuple *tuple_a, u32 offset_a, + struct box_tuple *tuple_b, u32 offset_b) +{ + int part_count = key_def->part_count; + assert(first_field + part_count <= tuple_a->cardinality); + assert(first_field + part_count <= tuple_b->cardinality); + + /* Compare key parts. */ + u8 *ad = tuple_a->data + offset_a; + u8 *bd = tuple_b->data + offset_b; + for (int part = 0; part < part_count; ++part) { + u32 al = load_varint32((void**) &ad); + u32 bl = load_varint32((void**) &bd); int r = dense_part_compare(key_def->parts[part].type, - tuple_a->data, - off_a[field - first_field], - tuple_b->data, - off_b[field - first_field]); + ad, al, bd, bl); if (r) { return r; } + ad += al; + bd += bl; } return 0; } @@ -570,10 +616,8 @@ dense_node_compare(struct key_def *key_def, u32 first_field, static int dense_key_part_compare(enum field_data_type type, const u8 *data_a, union sparse_part part_a, - const u8 *data_b, u32 offset_b) + const u8 *bd, u32 bl) { - const u8 *bd = data_b + offset_b; - u32 bl = load_varint32((void *) &bd); if (type == NUM) { u32 an, bn; an = part_a.num32; @@ -637,14 +681,45 @@ dense_key_node_compare(struct key_def *key_def, part_count = key_data->part_count; for (int part = 0; part < part_count; ++part) { int field = key_def->parts[part].fieldno; + const u8 *bd = tuple->data + off[field - first_field]; + u32 bl = load_varint32((void *) &bd); + int r = dense_key_part_compare(key_def->parts[part].type, + key_data->data, + key_data->parts[part], + bd, bl); + if (r) { + return r; + } + } + return 0; +} + +/** + * Compare a key for a key search data and a dense node with parts in + * linear order. + */ +static int +linear_key_node_compare(struct key_def *key_def, + const struct key_data *key_data, + u32 first_field, struct box_tuple *tuple, u32 offset) +{ + int part_count = key_def->part_count; + assert(first_field + part_count <= tuple->cardinality); + + /* Compare key parts. */ + if (part_count > key_data->part_count) + part_count = key_data->part_count; + u8 *bd = tuple->data + offset; + for (int part = 0; part < part_count; ++part) { + u32 bl = load_varint32((void *) &bd); int r = dense_key_part_compare(key_def->parts[part].type, key_data->data, key_data->parts[part], - tuple->data, - off[field - first_field]); + bd, bl); if (r) { return r; } + bd += bl; } return 0; } @@ -1013,6 +1088,7 @@ sparse_key_node_cmp(const void *key, const void *node, void *arg) @interface DenseTreeIndex: TreeIndex { @public u32 first_field; + bool is_linear; } @end @@ -1050,12 +1126,47 @@ dense_key_node_cmp(const void *key, const void * node, void *arg) node_x->tuple, node_x->offset); } +static int +linear_dense_node_cmp(const void *node_a, const void *node_b, void *arg) +{ + DenseTreeIndex *index = (DenseTreeIndex *) arg; + const struct dense_node *node_xa = node_a; + const struct dense_node *node_xb = node_b; + return linear_node_compare(index->key_def, index->first_field, + node_xa->tuple, node_xa->offset, + node_xb->tuple, node_xb->offset); +} + +static int +linear_dense_dup_node_cmp(const void *node_a, const void *node_b, void *arg) +{ + int r = linear_dense_node_cmp(node_a, node_b, arg); + if (r == 0) { + const struct dense_node *node_xa = node_a; + const struct dense_node *node_xb = node_b; + r = ta_cmp(node_xa->tuple, node_xb->tuple); + } + return r; +} + +static int +linear_dense_key_node_cmp(const void *key, const void * node, void *arg) +{ + DenseTreeIndex *index = (DenseTreeIndex *) arg; + const struct key_data *key_data = key; + const struct dense_node *node_x = node; + return linear_key_node_compare(index->key_def, key_data, + index->first_field, + node_x->tuple, node_x->offset); +} + @implementation DenseTreeIndex - (void) enable { [super enable]; first_field = find_first_field(key_def); + is_linear = key_is_linear(key_def); } - (size_t) node_size @@ -1065,17 +1176,17 @@ dense_key_node_cmp(const void *key, const void * node, void *arg) - (tree_cmp_t) node_cmp { - return dense_node_cmp; + return is_linear ? linear_dense_node_cmp : dense_node_cmp; } - (tree_cmp_t) dup_node_cmp { - return dense_dup_node_cmp; + return is_linear ? linear_dense_dup_node_cmp : dense_dup_node_cmp; } - (tree_cmp_t) key_node_cmp { - return dense_key_node_cmp; + return is_linear ? linear_dense_key_node_cmp : dense_key_node_cmp; } - (void) fold: (void *) node :(struct box_tuple *) tuple @@ -1175,6 +1286,7 @@ num32_key_node_cmp(const void * key, const void * node, void *arg) @public u32 first_field; u32 first_offset; + bool is_linear; } @end @@ -1212,6 +1324,40 @@ fixed_key_node_cmp(const void *key, const void * node, void *arg) node_x->tuple, index->first_offset); } +static int +linear_fixed_node_cmp(const void *node_a, const void *node_b, void *arg) +{ + FixedTreeIndex *index = (FixedTreeIndex *) arg; + const struct fixed_node *node_xa = node_a; + const struct fixed_node *node_xb = node_b; + return linear_node_compare(index->key_def, index->first_field, + node_xa->tuple, index->first_offset, + node_xb->tuple, index->first_offset); +} + +static int +linear_fixed_dup_node_cmp(const void *node_a, const void *node_b, void *arg) +{ + int r = linear_fixed_node_cmp(node_a, node_b, arg); + if (r == 0) { + const struct fixed_node *node_xa = node_a; + const struct fixed_node *node_xb = node_b; + r = ta_cmp(node_xa->tuple, node_xb->tuple); + } + return r; +} + +static int +linear_fixed_key_node_cmp(const void *key, const void * node, void *arg) +{ + FixedTreeIndex *index = (FixedTreeIndex *) arg; + const struct key_data *key_data = key; + const struct fixed_node *node_x = node; + return linear_key_node_compare(index->key_def, key_data, + index->first_field, + node_x->tuple, index->first_offset); +} + @implementation FixedTreeIndex - (void) enable @@ -1219,6 +1365,7 @@ fixed_key_node_cmp(const void *key, const void * node, void *arg) [super enable]; first_field = find_first_field(key_def); first_offset = find_fixed_offset(space, first_field, 0); + is_linear = key_is_linear(key_def); } - (size_t) node_size @@ -1228,17 +1375,17 @@ fixed_key_node_cmp(const void *key, const void * node, void *arg) - (tree_cmp_t) node_cmp { - return fixed_node_cmp; + return is_linear ? linear_fixed_node_cmp : fixed_node_cmp; } - (tree_cmp_t) dup_node_cmp { - return fixed_dup_node_cmp; + return is_linear ? linear_fixed_dup_node_cmp : fixed_dup_node_cmp; } - (tree_cmp_t) key_node_cmp { - return fixed_key_node_cmp; + return is_linear ? linear_fixed_key_node_cmp : fixed_key_node_cmp; } - (void) fold: (void *) node :(struct box_tuple *) tuple