From c317dad2abd21da18a9b33dbe703d4874e622ad6 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Fri, 5 Jul 2013 17:53:27 +0400
Subject: [PATCH] tuple-formats-v4: introduce a (so far defunct) format
 framework.

Use formats when creating tuples.
Store format id in the tuple.
---
 src/box/box.cc     |  3 +-
 src/box/box_lua.cc | 11 +++---
 src/box/key_def.h  | 11 +++++-
 src/box/request.cc | 30 ++++++++--------
 src/box/space.cc   |  4 +++
 src/box/space.h    |  7 ++--
 src/box/tuple.cc   | 87 ++++++++++++++++++++++++++++++++++++++++++----
 src/box/tuple.h    | 72 ++++++++++++++++++++++++++++++++++----
 8 files changed, 190 insertions(+), 35 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ae16024ae0..1be2126ec7 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -131,7 +131,8 @@ recover_snap_row(const void *data)
 	struct tuple *tuple;
 	try {
 		const char *tuple_data = row->data;
-		tuple = tuple_new(row->tuple_size, &tuple_data,
+		tuple = tuple_new(space->format,
+				  row->tuple_size, &tuple_data,
 				  tuple_data + row->data_size);
 	} catch (const ClientError &e) {
 		say_error("\n"
diff --git a/src/box/box_lua.cc b/src/box/box_lua.cc
index f0aff11d37..1cc76df1ec 100644
--- a/src/box/box_lua.cc
+++ b/src/box/box_lua.cc
@@ -353,7 +353,8 @@ lbox_tuple_transform(struct lua_State *L)
 	const char *expr = lua_tolstring(L, -1, &expr_len);
 
 	/* Execute tuple_update */
-	struct tuple *new_tuple = tuple_update(lua_region_alloc, L,
+	struct tuple *new_tuple = tuple_update(tuple_format_ber,
+					       lua_region_alloc, L,
 					       tuple, expr, expr + expr_len);
 	/* Cleanup memory allocated by lua_region_alloc */
 	lua_settop(L, 0);
@@ -1034,7 +1035,7 @@ lua_table_to_tuple(struct lua_State *L, int index)
 		tuple_len += field.len + varint32_sizeof(field.len);
 		lua_pop(L, 1);
 	}
-	struct tuple *tuple = tuple_alloc(tuple_len);
+	struct tuple *tuple = tuple_alloc(tuple_format_ber, tuple_len);
 	/*
 	 * Important: from here and on if there is an exception,
 	 * the tuple is leaked.
@@ -1061,7 +1062,8 @@ lua_totuple(struct lua_State *L, int index)
 	struct lua_field field;
 	lua_tofield(L, index, &field);
 	if (field.type != UNKNOWN) {
-		tuple = tuple_alloc(field.len + varint32_sizeof(field.len));
+		tuple = tuple_alloc(tuple_format_ber,
+				    field.len + varint32_sizeof(field.len));
 		tuple->field_count = 1;
 		pack_lstr(tuple->data, field.data, field.len);
 		return tuple;
@@ -1590,7 +1592,8 @@ box_unpack_response(struct lua_State *L, const char *s, const char *end)
 		if (tend > end)
 			tnt_raise(IllegalParams, "incorrect packet length");
 
-		struct tuple *tuple = tuple_new(field_count, &s, tend);
+		struct tuple *tuple = tuple_new(tuple_format_ber,
+						field_count, &s, tend);
 		lbox_pushtuple(L, tuple);
 	}
 	return s;
diff --git a/src/box/key_def.h b/src/box/key_def.h
index 1d0a395800..3ba9954108 100644
--- a/src/box/key_def.h
+++ b/src/box/key_def.h
@@ -37,6 +37,15 @@
 enum field_type { UNKNOWN = 0, NUM, NUM64, STRING, field_type_MAX };
 extern const char *field_type_strs[];
 
+
+static inline uint32_t
+field_type_maxlen(enum field_type type)
+{
+	static const uint32_t maxlen[] =
+		{ UINT32_MAX, 4, 8, UINT32_MAX, UINT32_MAX };
+	return maxlen[type];
+}
+
 #define INDEX_TYPE(_)                                               \
 	_(HASH,    0)       /* HASH Index  */                       \
 	_(TREE,    1)       /* TREE Index  */                       \
@@ -62,7 +71,7 @@ struct key_def {
 	 * 'parts' array for such index contains data from
 	 * key_field[0] and key_field[1] respectively.
 	 * max_fieldno is 5, and cmp_order array holds offsets of
-	 * field 3 and 5 in 'parts' array: -1, -1, 0, -1, 1.
+	 * field 3 and 5 in 'parts' array: -1, -1, -1, 0, -1, 1.
 	 */
 	uint32_t *cmp_order;
 	/* The size of the 'parts' array. */
diff --git a/src/box/request.cc b/src/box/request.cc
index 3b662575d9..cf71febf40 100644
--- a/src/box/request.cc
+++ b/src/box/request.cc
@@ -72,16 +72,17 @@ execute_replace(struct request *request, struct txn *txn)
 	txn_add_redo(txn, request->type, request->data, request->len);
 	const char **reqpos = &request->data;
 	const char *reqend = request->data + request->len;
-	struct space *sp = read_space(reqpos, reqend);
+	struct space *space = read_space(reqpos, reqend);
 	request->flags |= (pick_u32(reqpos, reqend) &
 			   BOX_ALLOWED_REQUEST_FLAGS);
 	uint32_t field_count = pick_u32(reqpos, reqend);
 
-	struct tuple *new_tuple = tuple_new(field_count, reqpos, reqend);
+	struct tuple *new_tuple = tuple_new(space->format, field_count,
+					    reqpos, reqend);
 	try {
-		space_validate_tuple(sp, new_tuple);
+		space_validate_tuple(space, new_tuple);
 		enum dup_replace_mode mode = dup_replace_mode(request->flags);
-		txn_replace(txn, sp, NULL, new_tuple, mode);
+		txn_replace(txn, space, NULL, new_tuple, mode);
 
 	} catch (const Exception& e) {
 		tuple_free(new_tuple);
@@ -96,7 +97,7 @@ execute_update(struct request *request, struct txn *txn)
 	txn_add_redo(txn, request->type, request->data, request->len);
 	const char **reqpos = &request->data;
 	const char *reqend = request->data + request->len;
-	struct space *sp = read_space(reqpos, reqend);
+	struct space *space = read_space(reqpos, reqend);
 	request->flags |= (pick_u32(reqpos, reqend) &
 			   BOX_ALLOWED_REQUEST_FLAGS);
 	/* Parse UPDATE request. */
@@ -104,7 +105,7 @@ execute_update(struct request *request, struct txn *txn)
 	uint32_t key_part_count;
 	const char *key = read_key(reqpos, reqend, &key_part_count);
 
-	Index *pk = space_index(sp, 0);
+	Index *pk = space_index(space, 0);
 	/* Try to find the tuple by primary key. */
 	primary_key_validate(pk->key_def, key, key_part_count);
 	struct tuple *old_tuple = pk->findByKey(key, key_part_count);
@@ -113,12 +114,13 @@ execute_update(struct request *request, struct txn *txn)
 		return;
 
 	/* Update the tuple. */
-	struct tuple *new_tuple = tuple_update(palloc_region_alloc,
+	struct tuple *new_tuple = tuple_update(space->format,
+					       palloc_region_alloc,
 					       fiber->gc_pool,
 					       old_tuple, *reqpos, reqend);
 	try {
-		space_validate_tuple(sp, new_tuple);
-		txn_replace(txn, sp, old_tuple, new_tuple, DUP_INSERT);
+		space_validate_tuple(space, new_tuple);
+		txn_replace(txn, space, old_tuple, new_tuple, DUP_INSERT);
 	} catch (const Exception& e) {
 		tuple_free(new_tuple);
 		throw;
@@ -132,9 +134,9 @@ execute_select(struct request *request, struct port *port)
 {
 	const char **reqpos = &request->data;
 	const char *reqend = request->data + request->len;
-	struct space *sp = read_space(reqpos, reqend);
+	struct space *space = read_space(reqpos, reqend);
 	uint32_t index_no = pick_u32(reqpos, reqend);
-	Index *index = index_find(sp, index_no);
+	Index *index = index_find(space, index_no);
 	uint32_t offset = pick_u32(reqpos, reqend);
 	uint32_t limit = pick_u32(reqpos, reqend);
 	uint32_t count = pick_u32(reqpos, reqend);
@@ -183,7 +185,7 @@ execute_delete(struct request *request, struct txn *txn)
 	txn_add_redo(txn, type, request->data, request->len);
 	const char **reqpos = &request->data;
 	const char *reqend = request->data + request->len;
-	struct space *sp = read_space(reqpos, reqend);
+	struct space *space = read_space(reqpos, reqend);
 	if (type == DELETE) {
 		request->flags |= pick_u32(reqpos, reqend) &
 			BOX_ALLOWED_REQUEST_FLAGS;
@@ -192,14 +194,14 @@ execute_delete(struct request *request, struct txn *txn)
 	uint32_t key_part_count;
 	const char *key = read_key(reqpos, reqend, &key_part_count);
 	/* Try to find tuple by primary key */
-	Index *pk = space_index(sp, 0);
+	Index *pk = space_index(space, 0);
 	primary_key_validate(pk->key_def, key, key_part_count);
 	struct tuple *old_tuple = pk->findByKey(key, key_part_count);
 
 	if (old_tuple == NULL)
 		return;
 
-	txn_replace(txn, sp, old_tuple, NULL, DUP_REPLACE_OR_INSERT);
+	txn_replace(txn, space, old_tuple, NULL, DUP_REPLACE_OR_INSERT);
 }
 
 /** To collects stats, we need a valid request type.
diff --git a/src/box/space.cc b/src/box/space.cc
index aaff428157..3e4eedb447 100644
--- a/src/box/space.cc
+++ b/src/box/space.cc
@@ -69,6 +69,8 @@ space_create(struct space *space, uint32_t space_no,
 	space->key_defs = key_defs;
 	space->key_count = key_count;
 	space_init_field_types(space);
+	space->format = tuple_format_new(space->field_types,
+					 space->max_fieldno);
 	/* fill space indexes */
 	for (uint32_t j = 0; j < key_count; ++j) {
 		struct key_def *key_def = &space->key_defs[j];
@@ -245,6 +247,7 @@ space_free(void)
 				mh_i32ptr_node(spaces, i)->val;
 		space_delete(space);
 	}
+	tuple_free();
 }
 
 /**
@@ -328,6 +331,7 @@ void
 space_init(void)
 {
 	spaces = mh_i32ptr_new();
+	tuple_init();
 
 	/* configure regular spaces */
 	space_config();
diff --git a/src/box/space.h b/src/box/space.h
index 40506ac69c..0417ca6389 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -60,7 +60,7 @@ struct space {
 
 	/**
 	 * Field types of indexed fields. This is an array of size
-	 * field_count. If there are gaps, i.e. fields that do not
+	 * max_fieldno. If there are gaps, i.e. fields that do not
 	 * participate in any index and thus we cannot infer their
 	 * type, then respective array members have value UNKNOWN.
 	 */
@@ -69,12 +69,15 @@ struct space {
 	/**
 	 * Max field no which participates in any of the space indexes.
 	 * Each tuple in this space must have, therefore, at least
-	 * field_count fields.
+	 * max_fieldno fields.
 	 */
 	uint32_t max_fieldno;
 
 	/** Space number. */
 	uint32_t no;
+
+	/** Default tuple format used by this space */
+	struct tuple_format *format;
 };
 
 
diff --git a/src/box/tuple.cc b/src/box/tuple.cc
index db71103428..0ab3a3c98d 100644
--- a/src/box/tuple.cc
+++ b/src/box/tuple.cc
@@ -38,15 +38,69 @@
 #include <fiber.h>
 #include "scoped_guard.h"
 
+/** Global table of tuple formats */
+struct tuple_format **tuple_formats;
+struct tuple_format *tuple_format_ber;
+
+static uint32_t formats_size, formats_capacity;
+
+static struct tuple_format *
+tuple_format_alloc_and_register(uint32_t offset_count)
+{
+	if (formats_size == formats_capacity) {
+		uint32_t new_capacity = formats_capacity ?
+			formats_capacity * 2 : 16;
+		if (new_capacity >= UINT16_MAX)
+			tnt_raise(LoggedError, ER_MEMORY_ISSUE,
+				  new_capacity, "tuple_formats", "resize");
+		struct tuple_format **formats = (struct tuple_format **)
+			realloc(tuple_formats,
+				new_capacity * sizeof(tuple_formats[0]));
+		if (formats == NULL)
+			tnt_raise(LoggedError, ER_MEMORY_ISSUE,
+				  new_capacity, "tuple_formats", "realloc");
+
+		formats_capacity = new_capacity;
+		tuple_formats = formats;
+	}
+
+	uint32_t total = sizeof(struct tuple_format) +
+		offset_count * sizeof(int32_t);
+
+	struct tuple_format *format = (struct tuple_format *) malloc(total);
+
+	if (format == NULL) {
+		tnt_raise(LoggedError, ER_MEMORY_ISSUE,
+			  total, "tuple format", "malloc");
+	}
+
+	format->id = formats_size++;
+	format->offset_count = offset_count;
+	tuple_formats[format->id] = format;
+	return format;
+}
+
+struct tuple_format *
+tuple_format_new(const enum field_type *fields, uint32_t max_fieldno)
+{
+	(void) max_fieldno;
+	(void) fields;
+	struct tuple_format *format =
+		tuple_format_alloc_and_register(0);
+
+	return format;
+}
+
 /** Allocate a tuple */
 struct tuple *
-tuple_alloc(size_t size)
+tuple_alloc(struct tuple_format *format, size_t size)
 {
 	size_t total = sizeof(struct tuple) + size;
 	struct tuple *tuple = (struct tuple *) salloc(total, "tuple");
 
-	tuple->flags = tuple->refs = 0;
+	tuple->refs = 0;
 	tuple->bsize = size;
+	tuple->format_id = tuple_format_id(format);
 
 	say_debug("tuple_alloc(%zu) = %p", size, tuple);
 	return tuple;
@@ -196,7 +250,8 @@ tuple_print(struct tbuf *buf, const struct tuple *tuple)
 }
 
 struct tuple *
-tuple_update(void *(*region_alloc)(void *, size_t), void *alloc_ctx,
+tuple_update(struct tuple_format *format,
+	     void *(*region_alloc)(void *, size_t), void *alloc_ctx,
 	     const struct tuple *old_tuple, const char *expr,
 	     const char *expr_end)
 {
@@ -210,7 +265,7 @@ tuple_update(void *(*region_alloc)(void *, size_t), void *alloc_ctx,
 				     &new_field_count);
 
 	/* Allocate a new tuple. */
-	struct tuple *new_tuple = tuple_alloc(new_size);
+	struct tuple *new_tuple = tuple_alloc(format, new_size);
 	new_tuple->field_count = new_field_count;
 
 	try {
@@ -223,14 +278,15 @@ tuple_update(void *(*region_alloc)(void *, size_t), void *alloc_ctx,
 }
 
 struct tuple *
-tuple_new(uint32_t field_count, const char **data, const char *end)
+tuple_new(struct tuple_format *format, uint32_t field_count,
+	  const char **data, const char *end)
 {
 	size_t tuple_len = end - *data;
 
 	if (tuple_len != tuple_range_size(data, end, field_count))
 		tnt_raise(IllegalParams, "tuple_new(): incorrect tuple format");
 
-	struct tuple *new_tuple = tuple_alloc(tuple_len);
+	struct tuple *new_tuple = tuple_alloc(format, tuple_len);
 	new_tuple->field_count = field_count;
 	memcpy(new_tuple->data, end - tuple_len, tuple_len);
 	return new_tuple;
@@ -330,7 +386,8 @@ tuple_compare_with_key(const struct tuple *tuple_a, const char *key,
 		uint32_t field_no = key_def->parts[part].fieldno;
 
 		uint32_t size_a;
-		const char *field_a = tuple_field(tuple_a, field_no, &size_a);
+		const char *field_a = tuple_field(tuple_a, field_no,
+						  &size_a);
 
 		uint32_t key_size = load_varint32(&key);
 		int r = tuple_compare_field(field_a, size_a, key, key_size,
@@ -344,3 +401,19 @@ tuple_compare_with_key(const struct tuple *tuple_a, const char *key,
 
 	return 0;
 }
+
+void
+tuple_init()
+{
+	tuple_format_ber = tuple_format_new(NULL, 0);
+}
+
+void
+tuple_free()
+{
+	for (struct tuple_format **format = tuple_formats;
+	     format < tuple_formats + formats_size;
+	     format++)
+		free(*format);
+	free(tuple_formats);
+}
diff --git a/src/box/tuple.h b/src/box/tuple.h
index ad7bab91be..f43782b899 100644
--- a/src/box/tuple.h
+++ b/src/box/tuple.h
@@ -29,10 +29,48 @@
  * SUCH DAMAGE.
  */
 #include "tarantool/util.h"
+#include "key_def.h"
 #include <pickle.h>
 
 struct tbuf;
-struct key_def;
+
+/**
+ * @brief In-memory tuple format
+ */
+struct tuple_format {
+	uint16_t id;
+	/**
+	 * Length of 'offset' array. Is usually the same as
+	 * space->max_fieldno (no need to store the offset of the
+	 * first field).
+	 */
+	uint32_t offset_count;
+};
+
+extern struct tuple_format **tuple_formats;
+/**
+ * Default format for a tuple which does not belong
+ * to any space and is stored in memory.
+ */
+extern struct tuple_format *tuple_format_ber;
+
+
+static inline uint32_t
+tuple_format_id(struct tuple_format *format)
+{
+	assert(tuple_formats[format->id] == format);
+	return format->id;
+}
+
+/**
+ * @brief Allocate, construct and register a new in-memory tuple
+ *	 format.
+ * @param space description
+ *
+ * @return tuple format
+ */
+struct tuple_format *
+tuple_format_new(const enum field_type *fields, uint32_t max_fieldno);
 
 /**
  * An atom of Tarantool/Box storage. Consists of a list of fields.
@@ -42,8 +80,8 @@ struct tuple
 {
 	/** reference counter */
 	uint16_t refs;
-	/* see enum tuple_flags */
-	uint16_t flags;
+	/** format identifier */
+	uint16_t format_id;
 	/** length of the variable part of the tuple */
 	uint32_t bsize;
 	/** number of fields in the variable part. */
@@ -62,7 +100,7 @@ struct tuple
  * @post tuple->refs = 1
  */
 struct tuple *
-tuple_alloc(size_t size);
+tuple_alloc(struct tuple_format *format, size_t size);
 
 /**
  * Create a new tuple from a sequence of BER-len encoded fields.
@@ -73,7 +111,8 @@ tuple_alloc(size_t size);
  * Throws an exception if tuple format is incorrect.
  */
 struct tuple *
-tuple_new(uint32_t field_count, const char **data, const char *end);
+tuple_new(struct tuple_format *format, uint32_t field_count,
+	  const char **data, const char *end);
 
 /**
  * Change tuple reference counter. If it has reached zero, free the tuple.
@@ -83,6 +122,19 @@ tuple_new(uint32_t field_count, const char **data, const char *end);
 void
 tuple_ref(struct tuple *tuple, int count);
 
+/**
+* @brief Return a tuple format instance
+* @param tuple tuple
+* @return tuple format instance
+*/
+static inline struct tuple_format *
+tuple_format(const struct tuple *tuple)
+{
+	struct tuple_format *format = tuple_formats[tuple->format_id];
+	assert(tuple_format_id(format) == tuple->format_id);
+	return format;
+}
+
 /**
  * Get a field from tuple by index.
  *
@@ -166,7 +218,8 @@ void
 tuple_print(struct tbuf *buf, const struct tuple *tuple);
 
 struct tuple *
-tuple_update(void *(*region_alloc)(void *, size_t), void *alloc_ctx,
+tuple_update(struct tuple_format *new_format,
+	     void *(*region_alloc)(void *, size_t), void *alloc_ctx,
 	     const struct tuple *old_tuple,
 	     const char *expr, const char *expr_end);
 
@@ -241,5 +294,12 @@ tuple_to_obuf(struct tuple *tuple, struct obuf *buf);
 void
 tuple_to_luabuf(struct tuple *tuple, struct luaL_Buffer *b);
 
+/** Initialize tuple library */
+void
+tuple_init();
+
+/** Cleanup tuple library */
+void
+tuple_free();
 #endif /* TARANTOOL_BOX_TUPLE_H_INCLUDED */
 
-- 
GitLab