From 122d40af34b774f07b7e9bcbcbe20c24a1e9ddf7 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov@tarantool.org>
Date: Thu, 5 Dec 2024 17:03:38 +0300
Subject: [PATCH] vinyl: move cache invalidation from vy_tx_write to vy_lsm_set

vy_lsm.c seems to be a more appropriate place for cache invalidation
because (a) it's vy_lsm that owns the cache and (b) we invalidate
the cache on rollback in vy_lsm_rollback_stmt().

While we are at it, let's inline vy_tx_write() and vy_tx_write_prepare()
because they are trivial and used in just one place.

NO_DOC=refactoring
NO_TEST=refactoring
NO_CHANGELOG=refactoring

(cherry picked from commit 44c245ef0baa227a6bff75de2a91f20da5532dc1)
---
 src/box/vy_lsm.c | 36 ++++++++++++++++++--
 src/box/vy_tx.c  | 87 +++++-------------------------------------------
 2 files changed, 42 insertions(+), 81 deletions(-)

diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index 2347bb802c..4e67b863b7 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -912,6 +912,34 @@ vy_lsm_set(struct vy_lsm *lsm, struct vy_mem *mem,
 		diag_set(ClientError, ER_TRANSACTION_CONFLICT);
 		return -1;
 	}
+
+	/* Invalidate cache element. */
+	struct vy_entry deleted = vy_entry_none();
+	vy_cache_on_write(&lsm->cache, entry, &deleted);
+
+	/*
+	 * The UPSERT statement can be applied to the cached statement,
+	 * because the cache always contains newest REPLACE statements.
+	 * In such a case the UPSERT, applied to the cached statement,
+	 * can be inserted instead of the original UPSERT.
+	 */
+	bool need_unref = false;
+	if (deleted.stmt != NULL &&
+	    vy_stmt_type(entry.stmt) == IPROTO_UPSERT) {
+		struct vy_entry applied = vy_entry_apply_upsert(
+				entry, deleted, mem->cmp_def, false);
+		/*
+		 * Ignore a memory error, because it is not critical
+		 * to apply the optimization.
+		 */
+		if (applied.stmt != NULL) {
+			entry = applied;
+			need_unref = true;
+		}
+	}
+	if (deleted.stmt != NULL)
+		tuple_unref(deleted.stmt);
+
 	/*
 	 * Allocate region_stmt on demand.
 	 *
@@ -924,11 +952,13 @@ vy_lsm_set(struct vy_lsm *lsm, struct vy_mem *mem,
 		*region_stmt = vy_stmt_dup_lsregion(entry.stmt,
 						    &mem->env->allocator,
 						    mem->generation);
-		if (*region_stmt == NULL)
-			return -1;
 	}
-	entry.stmt = *region_stmt;
+	if (need_unref)
+		tuple_unref(entry.stmt);
+	if (*region_stmt == NULL)
+		return -1;
 
+	entry.stmt = *region_stmt;
 	struct vy_stmt_counter *count = &lsm->stat.memory.count;
 	if (vy_stmt_type(*region_stmt) != IPROTO_UPSERT)
 		return vy_mem_insert(mem, entry, count);
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 8f9acb94e2..9c04678d6f 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -51,7 +51,6 @@
 #include "trigger.h"
 #include "trivia/util.h"
 #include "tuple.h"
-#include "vy_cache.h"
 #include "vy_lsm.h"
 #include "vy_mem.h"
 #include "vy_stat.h"
@@ -477,80 +476,6 @@ vy_tx_begin(struct vy_tx_manager *xm, enum txn_isolation_level isolation)
 	return tx;
 }
 
-/**
- * Rotate the active in-memory tree if necessary and pin it to make
- * sure it is not dumped until the transaction is complete.
- */
-static int
-vy_tx_write_prepare(struct txv *v)
-{
-	struct vy_lsm *lsm = v->lsm;
-	if (vy_lsm_rotate_mem_if_required(lsm) != 0)
-		return -1;
-	vy_mem_pin(lsm->mem);
-	v->mem = lsm->mem;
-	return 0;
-}
-
-/**
- * Write a single statement into an LSM tree. If the statement has
- * an lsregion copy then use it, else create it.
- *
- * @param lsm         LSM tree to write to.
- * @param mem         In-memory tree to write to.
- * @param entry       Statement allocated with malloc().
- * @param region_stmt NULL or the same statement as stmt,
- *                    but allocated on lsregion.
- *
- * @retval  0 Success.
- * @retval -1 Memory error.
- */
-static int
-vy_tx_write(struct vy_lsm *lsm, struct vy_mem *mem,
-	    struct vy_entry entry, struct tuple **region_stmt)
-{
-	assert(vy_stmt_is_refable(entry.stmt));
-	assert(*region_stmt == NULL || !vy_stmt_is_refable(*region_stmt));
-
-	/*
-	 * The UPSERT statement can be applied to the cached
-	 * statement, because the cache always contains only
-	 * newest REPLACE statements. In such a case the UPSERT,
-	 * applied to the cached statement, can be inserted
-	 * instead of the original UPSERT.
-	 */
-	if (vy_stmt_type(entry.stmt) == IPROTO_UPSERT) {
-		struct vy_entry deleted = vy_entry_none();
-		/* Invalidate cache element. */
-		vy_cache_on_write(&lsm->cache, entry, &deleted);
-		if (deleted.stmt != NULL) {
-			struct vy_entry applied;
-			applied = vy_entry_apply_upsert(entry, deleted,
-							mem->cmp_def, false);
-			tuple_unref(deleted.stmt);
-			if (applied.stmt != NULL) {
-				enum iproto_type applied_type =
-					vy_stmt_type(applied.stmt);
-				assert(applied_type == IPROTO_REPLACE ||
-				       applied_type == IPROTO_INSERT);
-				(void) applied_type;
-				int rc = vy_lsm_set(lsm, mem, applied,
-						    region_stmt);
-				tuple_unref(applied.stmt);
-				return rc;
-			}
-			/*
-			 * Ignore a memory error, because it is
-			 * not critical to apply the optimization.
-			 */
-		}
-	} else {
-		/* Invalidate cache element. */
-		vy_cache_on_write(&lsm->cache, entry, NULL);
-	}
-	return vy_lsm_set(lsm, mem, entry, region_stmt);
-}
-
 /**
  * Try to generate a deferred DELETE statement on tx commit.
  *
@@ -794,9 +719,15 @@ vy_tx_prepare(struct vy_tx *tx)
 			vy_stmt_set_type(v->entry.stmt, type);
 		}
 
-		if (vy_tx_write_prepare(v) != 0)
+		/*
+		 * Rotate the active in-memory tree if necessary and pin it
+		 * to make sure it is not dumped until the transaction is
+		 * complete.
+		 */
+		if (vy_lsm_rotate_mem_if_required(lsm) != 0)
 			return -1;
-		assert(v->mem != NULL);
+		vy_mem_pin(lsm->mem);
+		v->mem = lsm->mem;
 
 		if (lsm->index_id == 0 &&
 		    vy_stmt_flags(v->entry.stmt) & VY_STMT_DEFERRED_DELETE &&
@@ -807,7 +738,7 @@ vy_tx_prepare(struct vy_tx *tx)
 		vy_stmt_set_lsn(v->entry.stmt, MAX_LSN + tx->psn);
 		struct tuple **region_stmt =
 			(type == IPROTO_DELETE) ? &delete : &repsert;
-		if (vy_tx_write(lsm, v->mem, v->entry, region_stmt) != 0)
+		if (vy_lsm_set(lsm, v->mem, v->entry, region_stmt) != 0)
 			return -1;
 		v->region_stmt = *region_stmt;
 	}
-- 
GitLab