From 51c4cce3d167f00f914802f3923e7cec01db7a7a Mon Sep 17 00:00:00 2001
From: Mergen Imeev <imeevma@tarantool.org>
Date: Tue, 10 Oct 2023 11:43:46 +0300
Subject: [PATCH] sql: introduce sql_fieldno_by_id()

This patch introduces the sql_fieldno_by_id() function. This function is
used to search for a space field based on a given element of the
struct IdList.

Needed for #4467

NO_DOC=refactoring
NO_TEST=refactoring
NO_CHANGELOG=refactoring
---
 src/box/sql.c        | 23 ++++++++++++++------
 src/box/sql/insert.c | 19 +++++++----------
 src/box/sql/select.c | 50 ++++++++++++++++++++++----------------------
 src/box/sql/sqlInt.h | 14 +++++++++++++
 4 files changed, 63 insertions(+), 43 deletions(-)

diff --git a/src/box/sql.c b/src/box/sql.c
index cc7bcff05f..e9b336331a 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -1665,19 +1665,30 @@ sql_index_id_by_src(const struct SrcList_item *src)
 }
 
 uint32_t
-sql_fieldno_by_token(const struct space *space, const struct Token *name)
+sql_space_fieldno(const struct space *space, const char *name)
 {
-	char *name_str = sql_name_from_token(name);
 	for (uint32_t i = 0; i < space->def->field_count; ++i) {
-		if (strcmp(space->def->fields[i].name, name_str) == 0) {
-			sql_xfree(name_str);
+		if (strcmp(space->def->fields[i].name, name) == 0)
 			return i;
-		}
 	}
-	sql_xfree(name_str);
 	return UINT32_MAX;
 }
 
+uint32_t
+sql_fieldno_by_token(const struct space *space, const struct Token *name)
+{
+	char *name_str = sql_name_from_token(name);
+	uint32_t res = sql_space_fieldno(space, name_str);
+	sql_xfree(name_str);
+	return res;
+}
+
+uint32_t
+sql_fieldno_by_id(const struct space *space, const struct IdList_item *id)
+{
+	return sql_space_fieldno(space, id->zName);
+}
+
 /**
  * Return a constraint with the name specified by the token and the
  * specified type. Return NULL if the constraint was not found.
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index 731bd146bc..d4d4a9aa03 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -354,16 +354,8 @@ sqlInsert(Parse * pParse,	/* Parser context */
 			pColumn->a[i].idx = -1;
 		}
 		for (i = 0; i < pColumn->nId; i++) {
-			for (j = 0; j < (int) space_def->field_count; j++) {
-				if (strcmp(pColumn->a[i].zName,
-					   space_def->fields[j].name) == 0) {
-					pColumn->a[i].idx = j;
-					if (i != j)
-						bIdListInOrder = 0;
-					break;
-				}
-			}
-			if (j >= (int) space_def->field_count) {
+			uint32_t id = sql_fieldno_by_id(space, &pColumn->a[i]);
+			if (id == UINT32_MAX) {
 				diag_set(ClientError,
 					 ER_NO_SUCH_FIELD_NAME_IN_SPACE,
 					 pColumn->a[i].zName,
@@ -371,7 +363,10 @@ sqlInsert(Parse * pParse,	/* Parser context */
 				pParse->is_aborted = true;
 				goto insert_cleanup;
 			}
-			if (bit_test(used_columns, j)) {
+			pColumn->a[i].idx = id;
+			if ((uint32_t)i != id)
+				bIdListInOrder = 0;
+			if (bit_test(used_columns, id)) {
 				const char *err = "table id list: duplicate "\
 						  "column name %s";
 				diag_set(ClientError, ER_SQL_PARSER_GENERIC,
@@ -379,7 +374,7 @@ sqlInsert(Parse * pParse,	/* Parser context */
 				pParse->is_aborted = true;
 				goto insert_cleanup;
 			}
-			bit_set(used_columns, j);
+			bit_set(used_columns, id);
 		}
 	}
 
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index 9581bbeb95..c05c532a27 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -863,6 +863,8 @@ sqlProcessJoin(Parse * pParse, Select * p)
 			pRight->pOn = 0;
 		}
 
+		if (pRight->pUsing == NULL)
+			continue;
 		/* Create extra terms on the WHERE clause for each column named
 		 * in the USING clause.  Example: If the two tables to be joined are
 		 * A and B and the USING clause names X, Y, and Z, then add this
@@ -870,32 +872,30 @@ sqlProcessJoin(Parse * pParse, Select * p)
 		 * Report an error if any column mentioned in the USING clause is
 		 * not contained in both tables to be joined.
 		 */
-		if (pRight->pUsing) {
-			const char *err = "cannot join using column %s - "\
-					  "column not present in both tables";
-			IdList *pList = pRight->pUsing;
-			for (j = 0; j < pList->nId; j++) {
-				char *zName;	/* Name of the term in the USING clause */
-				int iLeft;	/* Table on the left with matching column name */
-				int iLeftCol;	/* Column number of matching column on the left */
-				int iRightCol;	/* Column number of matching column on the right */
-
-				zName = pList->a[j].zName;
-				iRightCol = columnIndex(right_space->def, zName);
-				if (iRightCol < 0
-				    || !tableAndColumnIndex(pSrc, i + 1, zName,
-							    &iLeft, &iLeftCol)
-				    ) {
-					err = tt_sprintf(err, zName);
-					diag_set(ClientError,
-						 ER_SQL_PARSER_GENERIC, err);
-					pParse->is_aborted = true;
-					return 1;
-				}
-				addWhereTerm(pParse, pSrc, iLeft, iLeftCol,
-					     i + 1, iRightCol, isOuter,
-					     &p->pWhere);
+		for (int id = 0; id < pRight->pUsing->nId; ++id) {
+			struct IdList_item *item = &pRight->pUsing->a[id];
+			uint32_t fieldno_right = sql_fieldno_by_id(right_space,
+								   item);
+			uint32_t fieldno_left = UINT32_MAX;
+			int n;
+			for (n = 0; n <= i; ++n) {
+				const struct space *space = pSrc->a[n].space;
+				fieldno_left = sql_fieldno_by_id(space, item);
+				if (fieldno_left != UINT32_MAX)
+					break;
+			}
+			if (fieldno_left == UINT32_MAX ||
+			    fieldno_right == UINT32_MAX) {
+				diag_set(ClientError, ER_SQL_PARSER_GENERIC,
+					 tt_sprintf("cannot join using column "
+						    "%s - column not present "
+						    "in both tables",
+						    item->zName));
+				pParse->is_aborted = true;
+				return 1;
 			}
+			addWhereTerm(pParse, pSrc, n, fieldno_left, i + 1,
+				     fieldno_right, isOuter, &p->pWhere);
 		}
 	}
 	return 0;
diff --git a/src/box/sql/sqlInt.h b/src/box/sql/sqlInt.h
index 932b22195d..bf9fa94d1a 100644
--- a/src/box/sql/sqlInt.h
+++ b/src/box/sql/sqlInt.h
@@ -3118,6 +3118,13 @@ sql_space_by_token(const struct Token *name);
 const struct space *
 sql_space_by_src(const struct SrcList_item *src);
 
+/**
+ * Return the fieldno of the field with the given name. Return UINT32_MAX if the
+ * field was not found.
+ */
+uint32_t
+sql_space_fieldno(const struct space *space, const char *name);
+
 /**
  * Return id of index with the name defined by the token. Return UINT32_MAX if
  * the index was not found.
@@ -3139,6 +3146,13 @@ sql_index_id_by_src(const struct SrcList_item *src);
 uint32_t
 sql_fieldno_by_token(const struct space *space, const struct Token *name);
 
+/**
+ * Return the fieldno of the field with the name defined by the element of
+ * IdList. Return UINT32_MAX if the field was not found.
+ */
+uint32_t
+sql_fieldno_by_id(const struct space *space, const struct IdList_item *id);
+
 /**
  * Return the tuple foreign key constraint with the name defined by the token.
  * Return NULL if the tuple foreign key constraint was not found.
-- 
GitLab