diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 7e56299ecd47490d1de57fafe75e002bc3c2fdff..57e283991b0f9f42d88b8d6fa852da1e825ee85c 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1972,11 +1972,15 @@ vy_lsm_upsert(struct vy_tx *tx, struct vy_lsm *lsm,
 {
 	assert(tx == NULL || tx->state == VINYL_TX_READY);
 	struct tuple *vystmt;
-	struct iovec operations[1];
-	operations[0].iov_base = (void *)expr;
-	operations[0].iov_len = expr_end - expr;
+	struct iovec operations[2];
+	/* MP_ARRAY with size 1. */
+	char header = 0x91;
+	operations[0].iov_base = &header;
+	operations[0].iov_len = 1;
+	operations[1].iov_base = (void *)expr;
+	operations[1].iov_len = expr_end - expr;
 	vystmt = vy_stmt_new_upsert(lsm->mem_format, tuple, tuple_end,
-				    operations, 1);
+				    operations, 2);
 	if (vystmt == NULL)
 		return -1;
 	assert(vy_stmt_type(vystmt) == IPROTO_UPSERT);
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 25219230d05aa7abb44d391b8996b755bcfddba7..24c7eaad73b00bddf11869936ad6734e840310e4 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -526,7 +526,10 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
  * @param tuple_end End of the array that begins from @param tuple_begin.
  * @param format Format of a tuple for offsets generating.
  * @param part_count Part count from key definition.
- * @param operations Vector of update operations.
+ * @param operations Vector of update operation pieces. Each iovec here may be
+ *     a part of an operation, or a whole operation, or something including
+ *     several operations. It is just a list of buffers. Each buffer is not
+ *     interpreted as an independent operation.
  * @param ops_cnt Length of the update operations vector.
  *
  * @retval NULL     Memory allocation error.
diff --git a/src/box/vy_upsert.c b/src/box/vy_upsert.c
index 797492c2b3a0fa48be98f8802bb3e3523621f092..fdae931f67c6f25a9c6dabb85c0be1e93a8d9894 100644
--- a/src/box/vy_upsert.c
+++ b/src/box/vy_upsert.c
@@ -39,39 +39,151 @@
 #include "column_mask.h"
 
 /**
- * Try to squash two upsert series (msgspacked index_base + ops)
- * Try to create a tuple with squahed operations
+ * Check that key hasn't been changed after applying upsert operation.
+ */
+static bool
+vy_apply_result_does_cross_pk(struct tuple *old_stmt, const char *result,
+			      const char *result_end, struct key_def *cmp_def,
+			      uint64_t col_mask)
+{
+	if (!key_update_can_be_skipped(cmp_def->column_mask, col_mask)) {
+		struct tuple *tuple =
+			vy_stmt_new_replace(tuple_format(old_stmt), result,
+					    result_end);
+		int cmp_res = vy_stmt_compare(old_stmt, HINT_NONE, tuple,
+					      HINT_NONE, cmp_def);
+		tuple_unref(tuple);
+		return cmp_res != 0;
+	}
+	return false;
+}
+
+/**
+ * Apply update operations from @a upsert on tuple @a stmt. If @a stmt is
+ * void statement (i.e. it is NULL or delete statement) then operations are
+ * applied on tuple stored in @a upsert. Update operations of @a upsert which
+ * can't be applied are skipped along side with other operations from single
+ * group (i.e. packed in one msgpack array); errors may be logged depending on
+ * @a suppress_error flag.
  *
- * @retval 0 && *result_stmt != NULL : successful squash
- * @retval 0 && *result_stmt == NULL : unsquashable sources
- * @retval -1 - memory error
+ * @param upsert Upsert statement to be applied on @a stmt.
+ * @param stmt Statement to be used as base for upsert operations.
+ * @param cmp_def Key definition required to provide check of primary key
+ *                modification.
+ * @param suppress_error If true, do not raise/log any errors.
+ * @return Tuple containing result of upsert application; NULL in case OOM.
  */
-static int
-vy_upsert_try_to_squash(struct tuple_format *format,
-			const char *key_mp, const char *key_mp_end,
-			const char *old_serie, const char *old_serie_end,
-			const char *new_serie, const char *new_serie_end,
-			struct tuple **result_stmt)
+static struct tuple *
+vy_apply_upsert_on_terminal_stmt(struct tuple *upsert, struct tuple *stmt,
+				 struct key_def *cmp_def, bool suppress_error)
 {
-	*result_stmt = NULL;
+	assert(vy_stmt_type(upsert) == IPROTO_UPSERT);
+	assert(stmt == NULL || vy_stmt_type(stmt) != IPROTO_UPSERT);
+	uint32_t mp_size;
+	const char *new_ops = vy_stmt_upsert_ops(upsert, &mp_size);
+	/* Msgpack containing result of upserts application. */
+	const char *result_mp;
+	bool stmt_is_void = stmt == NULL || vy_stmt_type(stmt) == IPROTO_DELETE;
+	if (stmt_is_void)
+		result_mp = vy_upsert_data_range(upsert, &mp_size);
+	else
+		result_mp = tuple_data_range(stmt, &mp_size);
+	const char *result_mp_end = result_mp + mp_size;
+	/*
+	 * xrow_upsert_execute() allocates result using region,
+	 * so save starting point to release it later.
+	 */
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	uint64_t column_mask = COLUMN_MASK_FULL;
+	struct tuple_format *format = tuple_format(upsert);
 
-	size_t squashed_size;
-	const char *squashed =
-		xrow_upsert_squash(old_serie, old_serie_end,
-				   new_serie, new_serie_end, format,
-				   &squashed_size, 0);
-	if (squashed == NULL)
-		return 0;
-	/* Successful squash! */
-	struct iovec operations[1];
-	operations[0].iov_base = (void *)squashed;
-	operations[0].iov_len = squashed_size;
+	uint32_t ups_cnt = mp_decode_array(&new_ops);
+	const char *ups_ops = new_ops;
+	/*
+	 * In case upsert folds into insert, we must skip first
+	 * update operations. Moreover, we should use upsert's tuple
+	 * to provide PK modification check.
+	 */
+	if (stmt_is_void) {
+		ups_cnt--;
+		mp_next(&ups_ops);
+		stmt = upsert;
+	}
+	for (uint32_t i = 0; i < ups_cnt; ++i) {
+		assert(mp_typeof(*ups_ops) == MP_ARRAY);
+		const char *ups_ops_end = ups_ops;
+		mp_next(&ups_ops_end);
+		const char *exec_res = result_mp;
+		exec_res = xrow_upsert_execute(ups_ops, ups_ops_end, result_mp,
+					       result_mp_end, format, &mp_size,
+					       0, suppress_error, &column_mask);
+		if (exec_res == NULL) {
+			if (! suppress_error) {
+				struct error *e = diag_last_error(diag_get());
+				assert(e != NULL);
+				/* Bail out immediately in case of OOM. */
+				if (e->type != &type_ClientError) {
+					region_truncate(region, region_svp);
+					return NULL;
+				}
+				diag_log();
+			}
+			ups_ops = ups_ops_end;
+			continue;
+		}
+		/*
+		 * If it turns out that resulting tuple modifies primary
+		 * key, then simply ignore this upsert.
+		 */
+		if (vy_apply_result_does_cross_pk(stmt, exec_res,
+						  exec_res + mp_size, cmp_def,
+						  column_mask)) {
+			if (!suppress_error) {
+				say_error("upsert operations %s are not applied"\
+					  " due to primary key modification",
+					  mp_str(ups_ops));
+			}
+			ups_ops = ups_ops_end;
+			continue;
+		}
+		ups_ops = ups_ops_end;
+		/*
+		 * Result statement must satisfy space's format. Since upsert's
+		 * tuple correctness is already checked in vy_upsert(), let's
+		 * use its format to provide result verification.
+		 */
+		struct tuple_format *format = tuple_format(upsert);
+		if (tuple_validate_raw(format, exec_res) != 0) {
+			if (! suppress_error)
+				diag_log();
+			continue;
+		}
+		result_mp = exec_res;
+		result_mp_end = exec_res + mp_size;
+	}
+	struct tuple *new_terminal_stmt = vy_stmt_new_replace(format, result_mp,
+							      result_mp_end);
+	region_truncate(region, region_svp);
+	if (new_terminal_stmt == NULL)
+		return NULL;
+	vy_stmt_set_lsn(new_terminal_stmt, vy_stmt_lsn(upsert));
+	return new_terminal_stmt;
+}
 
-	*result_stmt = vy_stmt_new_upsert(format, key_mp, key_mp_end,
-					  operations, 1);
-	if (*result_stmt == NULL)
-		return -1;
-	return 0;
+/**
+ * Unpack upsert's update operations from msgpack array
+ * into array of iovecs.
+ */
+static void
+upsert_ops_to_iovec(const char *ops, uint32_t ops_cnt, struct iovec *iov_arr)
+{
+	for (uint32_t i = 0; i < ops_cnt; ++i) {
+		assert(mp_typeof(*ops) == MP_ARRAY);
+		iov_arr[i].iov_base = (char *) ops;
+		mp_next(&ops);
+		iov_arr[i].iov_len = ops - (char *) iov_arr[i].iov_base;
+	}
 }
 
 struct tuple *
@@ -87,122 +199,63 @@ vy_apply_upsert(struct tuple *new_stmt, struct tuple *old_stmt,
 	assert(new_stmt != old_stmt);
 	assert(vy_stmt_type(new_stmt) == IPROTO_UPSERT);
 
-	if (old_stmt == NULL || vy_stmt_type(old_stmt) == IPROTO_DELETE) {
-		/*
-		 * INSERT case: return new stmt.
-		 */
-		return vy_stmt_replace_from_upsert(new_stmt);
+	struct tuple *result_stmt = NULL;
+	if (old_stmt == NULL || vy_stmt_type(old_stmt) != IPROTO_UPSERT) {
+		return vy_apply_upsert_on_terminal_stmt(new_stmt, old_stmt,
+						        cmp_def, suppress_error);
 	}
 
-	struct tuple_format *format = tuple_format(new_stmt);
-
+	assert(old_stmt != NULL);
+	assert(vy_stmt_type(old_stmt) == IPROTO_UPSERT);
 	/*
-	 * Unpack UPSERT operation from the new stmt
+	 * Unpack UPSERT operation from the old and new stmts.
 	 */
 	uint32_t mp_size;
-	const char *new_ops;
-	new_ops = vy_stmt_upsert_ops(new_stmt, &mp_size);
-	const char *new_ops_end = new_ops + mp_size;
-
+	const char *old_ops = vy_stmt_upsert_ops(old_stmt, &mp_size);
+	const char *old_stmt_mp = vy_upsert_data_range(old_stmt, &mp_size);
+	const char *old_stmt_mp_end = old_stmt_mp + mp_size;
+	const char *new_ops = vy_stmt_upsert_ops(new_stmt, &mp_size);
 	/*
-	 * Apply new operations to the old stmt
+	 * UPSERT + UPSERT case: unpack operations to iovec array and merge
+	 * them into one ops array.
 	 */
-	const char *result_mp;
-	if (vy_stmt_type(old_stmt) == IPROTO_UPSERT)
-		result_mp = vy_upsert_data_range(old_stmt, &mp_size);
-	else
-		result_mp = tuple_data_range(old_stmt, &mp_size);
-	const char *result_mp_end = result_mp + mp_size;
-	struct tuple *result_stmt = NULL;
+	struct tuple_format *format = tuple_format(old_stmt);
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
-	uint8_t old_type = vy_stmt_type(old_stmt);
-	uint64_t column_mask = COLUMN_MASK_FULL;
-	result_mp = xrow_upsert_execute(new_ops, new_ops_end, result_mp,
-					result_mp_end, format, &mp_size,
-					0, suppress_error, &column_mask);
-	if (result_mp == NULL) {
+	uint32_t old_ops_cnt = mp_decode_array(&old_ops);
+	uint32_t new_ops_cnt = mp_decode_array(&new_ops);
+	uint32_t total_ops_cnt = old_ops_cnt + new_ops_cnt;
+	size_t ops_size;
+	struct iovec *operations =
+		region_alloc_array(region, typeof(operations[0]),
+				   total_ops_cnt + 1, &ops_size);
+	if (operations == NULL) {
 		region_truncate(region, region_svp);
+		diag_set(OutOfMemory, ops_size, "region_alloc_array",
+			 "operations");
 		return NULL;
 	}
-	result_mp_end = result_mp + mp_size;
-	if (old_type != IPROTO_UPSERT) {
-		assert(old_type == IPROTO_INSERT ||
-		       old_type == IPROTO_REPLACE);
-		/*
-		 * UPDATE case: return the updated old stmt.
-		 */
-		result_stmt = vy_stmt_new_replace(format, result_mp,
-						  result_mp_end);
-		region_truncate(region, region_svp);
-		if (result_stmt == NULL)
-			return NULL; /* OOM */
-		vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
-		goto check_key;
-	}
-
-	/*
-	 * Unpack UPSERT operation from the old stmt
-	 */
-	assert(old_stmt != NULL);
-	const char *old_ops;
-	old_ops = vy_stmt_upsert_ops(old_stmt, &mp_size);
-	const char *old_ops_end = old_ops + mp_size;
-	assert(old_ops_end > old_ops);
-
+	char header[16];
+	char *header_end = mp_encode_array(header, total_ops_cnt);
+	operations[0].iov_base = header;
+	operations[0].iov_len = header_end - header;
+	upsert_ops_to_iovec(old_ops, old_ops_cnt, &operations[1]);
+	upsert_ops_to_iovec(new_ops, new_ops_cnt, &operations[old_ops_cnt + 1]);
 	/*
-	 * UPSERT + UPSERT case: combine operations
+	 * Adding update operations. We keep order of update operations in
+	 * the array the same. It is vital since first set of operations
+	 * must be skipped in case upsert folds into insert. For instance:
+	 * old_ops = {{{op1}, {op2}}, {{op3}}}
+	 * new_ops = {{{op4}, {op5}}}
+	 * res_ops = {{{op1}, {op2}}, {{op3}}, {{op4}, {op5}}}
+	 * If upsert corresponding to old_ops becomes insert, then
+	 * {{op1}, {op2}} update operations are not applied.
 	 */
-	assert(old_ops_end - old_ops > 0);
-	if (vy_upsert_try_to_squash(format, result_mp, result_mp_end,
-				    old_ops, old_ops_end, new_ops, new_ops_end,
-				    &result_stmt) != 0) {
-		region_truncate(region, region_svp);
-		return NULL;
-	}
-	if (result_stmt != NULL) {
-		region_truncate(region, region_svp);
-		vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
-		goto check_key;
-	}
-
-	/* Failed to squash, simply add one upsert to another */
-	int old_ops_cnt, new_ops_cnt;
-	struct iovec operations[3];
-
-	old_ops_cnt = mp_decode_array(&old_ops);
-	operations[1].iov_base = (void *)old_ops;
-	operations[1].iov_len = old_ops_end - old_ops;
-
-	new_ops_cnt = mp_decode_array(&new_ops);
-	operations[2].iov_base = (void *)new_ops;
-	operations[2].iov_len = new_ops_end - new_ops;
-
-	char ops_buf[16];
-	char *header = mp_encode_array(ops_buf, old_ops_cnt + new_ops_cnt);
-	operations[0].iov_base = (void *)ops_buf;
-	operations[0].iov_len = header - ops_buf;
-
-	result_stmt = vy_stmt_new_upsert(format, result_mp, result_mp_end,
-					 operations, 3);
+	result_stmt = vy_stmt_new_upsert(format, old_stmt_mp, old_stmt_mp_end,
+					 operations, total_ops_cnt + 1);
 	region_truncate(region, region_svp);
 	if (result_stmt == NULL)
 		return NULL;
 	vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
-
-check_key:
-	/*
-	 * Check that key hasn't been changed after applying operations.
-	 */
-	if (!key_update_can_be_skipped(cmp_def->column_mask, column_mask) &&
-	    vy_stmt_compare(old_stmt, HINT_NONE, result_stmt,
-			    HINT_NONE, cmp_def) != 0) {
-		/*
-		 * Key has been changed: ignore this UPSERT and
-		 * @retval the old stmt.
-		 */
-		tuple_unref(result_stmt);
-		result_stmt = vy_stmt_dup(old_stmt);
-	}
 	return result_stmt;
 }
diff --git a/test/vinyl/upgrade/upsert/00000000000000000000.vylog b/test/vinyl/upgrade/upsert/00000000000000000000.vylog
new file mode 100644
index 0000000000000000000000000000000000000000..581ad9b53eb1690cd0926aa8fcb462caeb3d874b
Binary files /dev/null and b/test/vinyl/upgrade/upsert/00000000000000000000.vylog differ
diff --git a/test/vinyl/upgrade/upsert/00000000000000000004.snap b/test/vinyl/upgrade/upsert/00000000000000000004.snap
new file mode 100644
index 0000000000000000000000000000000000000000..9c8767eadb5eb1a27dc39a23072f2b9e28327d8c
Binary files /dev/null and b/test/vinyl/upgrade/upsert/00000000000000000004.snap differ
diff --git a/test/vinyl/upgrade/upsert/00000000000000000004.vylog b/test/vinyl/upgrade/upsert/00000000000000000004.vylog
new file mode 100644
index 0000000000000000000000000000000000000000..5194b1457dc8a97ab3e0ea0ab0421920ec793b8c
Binary files /dev/null and b/test/vinyl/upgrade/upsert/00000000000000000004.vylog differ
diff --git a/test/vinyl/upgrade/upsert/00000000000000000004.xlog b/test/vinyl/upgrade/upsert/00000000000000000004.xlog
new file mode 100644
index 0000000000000000000000000000000000000000..5725d09ce07cb1f9125524f633c835699383976a
Binary files /dev/null and b/test/vinyl/upgrade/upsert/00000000000000000004.xlog differ
diff --git a/test/vinyl/upgrade/upsert/00000000000000000010.snap b/test/vinyl/upgrade/upsert/00000000000000000010.snap
new file mode 100644
index 0000000000000000000000000000000000000000..8b89c925fc080880c9a9176ed5cd68ac04c0dc6c
Binary files /dev/null and b/test/vinyl/upgrade/upsert/00000000000000000010.snap differ
diff --git a/test/vinyl/upgrade/upsert/00000000000000000010.vylog b/test/vinyl/upgrade/upsert/00000000000000000010.vylog
new file mode 100644
index 0000000000000000000000000000000000000000..954a51a4cdf23cce10630dd32e837b7a02e0b23e
Binary files /dev/null and b/test/vinyl/upgrade/upsert/00000000000000000010.vylog differ
diff --git a/test/vinyl/upgrade/upsert/00000000000000000010.xlog b/test/vinyl/upgrade/upsert/00000000000000000010.xlog
new file mode 100644
index 0000000000000000000000000000000000000000..f5921223d9afb86ccefd9bc73317fb01efa0b6d0
Binary files /dev/null and b/test/vinyl/upgrade/upsert/00000000000000000010.xlog differ
diff --git a/test/vinyl/upgrade/upsert/512/0/00000000000000000002.index b/test/vinyl/upgrade/upsert/512/0/00000000000000000002.index
new file mode 100644
index 0000000000000000000000000000000000000000..156c23dd779306e736c85c97a3d314865861a147
Binary files /dev/null and b/test/vinyl/upgrade/upsert/512/0/00000000000000000002.index differ
diff --git a/test/vinyl/upgrade/upsert/512/0/00000000000000000002.run b/test/vinyl/upgrade/upsert/512/0/00000000000000000002.run
new file mode 100644
index 0000000000000000000000000000000000000000..9757ea341935c02d873d631949db7e56c6daf06c
Binary files /dev/null and b/test/vinyl/upgrade/upsert/512/0/00000000000000000002.run differ
diff --git a/test/vinyl/upgrade/upsert/512/0/00000000000000000004.index b/test/vinyl/upgrade/upsert/512/0/00000000000000000004.index
new file mode 100644
index 0000000000000000000000000000000000000000..1c3896547d0919db9ec13347227956d2f2b9a560
Binary files /dev/null and b/test/vinyl/upgrade/upsert/512/0/00000000000000000004.index differ
diff --git a/test/vinyl/upgrade/upsert/512/0/00000000000000000004.run b/test/vinyl/upgrade/upsert/512/0/00000000000000000004.run
new file mode 100644
index 0000000000000000000000000000000000000000..469e2abab63457841f48cfbe67ebab6e0d5e86bd
Binary files /dev/null and b/test/vinyl/upgrade/upsert/512/0/00000000000000000004.run differ
diff --git a/test/vinyl/upgrade/upsert/512/0/00000000000000000006.index b/test/vinyl/upgrade/upsert/512/0/00000000000000000006.index
new file mode 100644
index 0000000000000000000000000000000000000000..9202ec7e7a46bbf01cb6699a5a4f7ad99a087b8e
Binary files /dev/null and b/test/vinyl/upgrade/upsert/512/0/00000000000000000006.index differ
diff --git a/test/vinyl/upgrade/upsert/512/0/00000000000000000006.run b/test/vinyl/upgrade/upsert/512/0/00000000000000000006.run
new file mode 100644
index 0000000000000000000000000000000000000000..d4fcd759953dccc5a0f84aacc808040d2ea805df
Binary files /dev/null and b/test/vinyl/upgrade/upsert/512/0/00000000000000000006.run differ
diff --git a/test/vinyl/upsert.result b/test/vinyl/upsert.result
index 3a7f6629d9d22ac694a69ca73d247a5dcbe0479b..fe673ad6fa2f1bc0f817fb8bbe746dc0039903d9 100644
--- a/test/vinyl/upsert.result
+++ b/test/vinyl/upsert.result
@@ -899,3 +899,570 @@ s:select()
 s:drop()
 ---
 ...
+-- gh-5107: don't squash upsert operations into one array.
+--
+-- gh-5087: test upsert execution/squash referring to fields in reversed
+-- order (via negative indexing).
+--
+s = box.schema.create_space('test', {engine = 'vinyl'})
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:insert({1, 1, 1})
+---
+- [1, 1, 1]
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1}, {{'=', 3, 100}})
+---
+...
+s:upsert({1}, {{'=', -1, 200}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select() -- {1, 1, 200}
+---
+- - [1, 1, 200]
+...
+s:delete({1})
+---
+...
+s:insert({1, 1, 1})
+---
+- [1, 1, 1]
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1}, {{'=', -3, 100}})
+---
+...
+s:upsert({1}, {{'=', -1, 200}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+-- gh-5105: Two upserts are NOT squashed into one, so only one (first one)
+-- is skipped, meanwhile second one is applied.
+--
+s:select() -- {1, 1, 1}
+---
+- - [1, 1, 200]
+...
+s:delete({1})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1, 1}, {{'=', -2, 300}}) -- {1, 1}
+---
+...
+s:upsert({1}, {{'+', -1, 100}}) -- {1, 101}
+---
+...
+s:upsert({1}, {{'-', 2, 100}}) -- {1, 1}
+---
+...
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 201}
+---
+...
+s:upsert({1}, {{'-', 2, 200}}) -- {1, 1}
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select() -- {1, 1}
+---
+- - [1, 1]
+...
+s:delete({1})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1, 1, 1}, {{'!', -1, 300}}) -- {1, 1, 1}
+---
+...
+s:upsert({1}, {{'+', -2, 100}}) -- {1, 101, 1}
+---
+...
+s:upsert({1}, {{'=', -1, 100}}) -- {1, 101, 100}
+---
+...
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 101, 300}
+---
+...
+s:upsert({1}, {{'-', -2, 100}}) -- {1, 1, 300}
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1, 300]
+...
+s:drop()
+---
+...
+-- gh-1622: upsert operations which break space format are not applied.
+--
+s = box.schema.space.create('test', { engine = 'vinyl', field_count = 2 })
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1, 1}
+---
+- [1, 1]
+...
+-- Error is logged, upsert is not applied.
+--
+s:upsert({1, 1}, {{'=', 3, 5}})
+---
+...
+-- During read the incorrect upsert is ignored.
+--
+s:select{}
+---
+- - [1, 1]
+...
+-- Try to set incorrect field_count in a transaction.
+--
+box.begin()
+---
+...
+s:replace{2, 2}
+---
+- [2, 2]
+...
+s:upsert({2, 2}, {{'=', 3, 2}})
+---
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+box.commit()
+---
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+-- Read incorrect upsert from a run: it should be ignored.
+--
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+s:upsert({2, 2}, {{'=', 3, 20}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+-- Execute replace/delete after invalid upsert.
+--
+box.snapshot()
+---
+- ok
+...
+s:upsert({2, 2}, {{'=', 3, 30}})
+---
+...
+s:replace{2, 3}
+---
+- [2, 3]
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 3]
+...
+s:upsert({1, 1}, {{'=', 3, 30}})
+---
+...
+s:delete{1}
+---
+...
+s:select{}
+---
+- - [2, 3]
+...
+-- Invalid upsert in a sequence of upserts is skipped meanwhile
+-- the rest are applied.
+--
+box.snapshot()
+---
+- ok
+...
+s:upsert({2, 2}, {{'+', 2, 5}})
+---
+...
+s:upsert({2, 2}, {{'=', 3, 40}})
+---
+...
+s:upsert({2, 2}, {{'+', 2, 5}})
+---
+...
+s:select{}
+---
+- - [2, 13]
+...
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [2, 13]
+...
+s:drop()
+---
+...
+-- Test different scenarious during which update operations squash can't
+-- take place due to format violations.
+--
+decimal = require('decimal')
+---
+...
+s = box.schema.space.create('test', { engine = 'vinyl', field_count = 5 })
+---
+...
+s:format({{name='id', type='unsigned'}, {name='u', type='unsigned'},\
+          {name='s', type='scalar'}, {name='f', type='double'},\
+          {name='d', type='decimal'}})
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1, 1, 1, 1.1, decimal.new(1.1) }
+---
+- [1, 1, 1, 1.1, 1.1]
+...
+s:replace{2, 1, 1, 1.1, decimal.new(1.1)}
+---
+- [2, 1, 1, 1.1, 1.1]
+...
+box.snapshot()
+---
+- ok
+...
+-- Can't assign integer to float field. First operation is still applied.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 4, 4}})
+---
+...
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'=', 4, 4}})
+---
+...
+-- Can't add floating point to integer (result is floating point).
+--
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5}})
+---
+...
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5.5}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1, 1, 5.1, 1.1]
+  - [2, 6, 1, 1.1, 1.1]
+...
+-- Integer overflow check.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}})
+---
+...
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}})
+---
+...
+-- Negative result of subtraction stored in unsigned field.
+--
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 2}})
+---
+...
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, 10}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1, 9223372036854775809, 5.1, 1.1]
+  - [2, 8, 1, 1.1, 1.1]
+...
+-- Decimals do not fit into numerics and vice versa.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 5, 2}})
+---
+...
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 5, 1}})
+---
+...
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, decimal.new(2.1)}})
+---
+...
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, decimal.new(1.2)}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1, 9223372036854775809, 5.1, 2.1]
+  - [2, 8, 1, 1.1, 1.1]
+...
+s:drop()
+---
+...
+-- Upserts leading to overflow are ignored.
+--
+format = {}
+---
+...
+format[1] = {name = 'f1', type = 'unsigned'}
+---
+...
+format[2] = {name = 'f2', type = 'unsigned'}
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl', format = format})
+---
+...
+_ = s:create_index('pk')
+---
+...
+uint_max = 18446744073709551615ULL
+---
+...
+s:replace{1, uint_max - 2}
+---
+- [1, 18446744073709551613]
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1, 0}, {{'+', 2, 1}})
+---
+...
+s:upsert({1, 0}, {{'+', 2, 1}})
+---
+...
+s:upsert({1, 0}, {{'+', 2, 1}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 18446744073709551615]
+...
+s:delete{1}
+---
+...
+s:replace{1, uint_max - 2, 0}
+---
+- [1, 18446744073709551613, 0]
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1, 0, 0}, {{'+', 2, 1}})
+---
+...
+s:upsert({1, 0, 0}, {{'+', 2, 1}})
+---
+...
+s:upsert({1, 0, 0}, {{'+', 2, 0.5}})
+---
+...
+s:upsert({1, 0, 0}, {{'+', 2, 1}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 18446744073709551615, 0]
+...
+s:drop()
+---
+...
+-- Make sure upserts satisfy associativity rule.
+--
+s = box.schema.space.create('test', {engine='vinyl'})
+---
+...
+i = s:create_index('pk', {parts={2, 'uint'}})
+---
+...
+s:replace{1, 2, 3, 'default'}
+---
+- [1, 2, 3, 'default']
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}})
+---
+...
+-- Upsert will fail and thus ignored.
+--
+s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [1, 2, 3, 'upserted']
+...
+s:drop()
+---
+...
+-- Combination of upserts and underlying void (i.e. delete or null)
+-- statement on disk. Upsert modifying PK is skipped.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+i = s:create_index('test', { run_count_per_level = 20 })
+---
+...
+for i = 101, 110 do s:replace{i, i} end
+---
+...
+s:replace({1, 1})
+---
+- [1, 1]
+...
+box.snapshot()
+---
+- ok
+...
+s:delete({1})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1, 1}, {{'=', 2, 2}})
+---
+...
+s:upsert({1, 1}, {{'=', 1, 0}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1]
+  - [101, 101]
+  - [102, 102]
+  - [103, 103]
+  - [104, 104]
+  - [105, 105]
+  - [106, 106]
+  - [107, 107]
+  - [108, 108]
+  - [109, 109]
+  - [110, 110]
+...
+s:drop()
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+i = s:create_index('test', { run_count_per_level = 20 })
+---
+...
+for i = 101, 110 do s:replace{i, i} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1, 1}, {{'=', 2, 2}})
+---
+...
+s:upsert({1, 1}, {{'=', 1, 0}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1]
+  - [101, 101]
+  - [102, 102]
+  - [103, 103]
+  - [104, 104]
+  - [105, 105]
+  - [106, 106]
+  - [107, 107]
+  - [108, 108]
+  - [109, 109]
+  - [110, 110]
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/upsert.test.lua b/test/vinyl/upsert.test.lua
index 1d77474dad88c26a49d92c30a2ec3849e5374a3b..b62c199787c2c7bb07edad2bda9ef5da1e304c42 100644
--- a/test/vinyl/upsert.test.lua
+++ b/test/vinyl/upsert.test.lua
@@ -372,3 +372,227 @@ box.snapshot()
 s:select()
 
 s:drop()
+
+-- gh-5107: don't squash upsert operations into one array.
+--
+-- gh-5087: test upsert execution/squash referring to fields in reversed
+-- order (via negative indexing).
+--
+s = box.schema.create_space('test', {engine = 'vinyl'})
+pk = s:create_index('pk')
+s:insert({1, 1, 1})
+box.snapshot()
+
+s:upsert({1}, {{'=', 3, 100}})
+s:upsert({1}, {{'=', -1, 200}})
+box.snapshot()
+s:select() -- {1, 1, 200}
+
+s:delete({1})
+s:insert({1, 1, 1})
+box.snapshot()
+
+s:upsert({1}, {{'=', -3, 100}})
+s:upsert({1}, {{'=', -1, 200}})
+box.snapshot()
+-- gh-5105: Two upserts are NOT squashed into one, so only one (first one)
+-- is skipped, meanwhile second one is applied.
+--
+s:select() -- {1, 1, 1}
+
+s:delete({1})
+box.snapshot()
+
+s:upsert({1, 1}, {{'=', -2, 300}}) -- {1, 1}
+s:upsert({1}, {{'+', -1, 100}}) -- {1, 101}
+s:upsert({1}, {{'-', 2, 100}}) -- {1, 1}
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 201}
+s:upsert({1}, {{'-', 2, 200}}) -- {1, 1}
+box.snapshot()
+s:select() -- {1, 1}
+
+s:delete({1})
+box.snapshot()
+
+s:upsert({1, 1, 1}, {{'!', -1, 300}}) -- {1, 1, 1}
+s:upsert({1}, {{'+', -2, 100}}) -- {1, 101, 1}
+s:upsert({1}, {{'=', -1, 100}}) -- {1, 101, 100}
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 101, 300}
+s:upsert({1}, {{'-', -2, 100}}) -- {1, 1, 300}
+box.snapshot()
+s:select()
+
+s:drop()
+
+-- gh-1622: upsert operations which break space format are not applied.
+--
+s = box.schema.space.create('test', { engine = 'vinyl', field_count = 2 })
+pk = s:create_index('pk')
+s:replace{1, 1}
+-- Error is logged, upsert is not applied.
+--
+s:upsert({1, 1}, {{'=', 3, 5}})
+-- During read the incorrect upsert is ignored.
+--
+s:select{}
+
+-- Try to set incorrect field_count in a transaction.
+--
+box.begin()
+s:replace{2, 2}
+s:upsert({2, 2}, {{'=', 3, 2}})
+s:select{}
+box.commit()
+s:select{}
+
+-- Read incorrect upsert from a run: it should be ignored.
+--
+box.snapshot()
+s:select{}
+s:upsert({2, 2}, {{'=', 3, 20}})
+box.snapshot()
+s:select{}
+
+-- Execute replace/delete after invalid upsert.
+--
+box.snapshot()
+s:upsert({2, 2}, {{'=', 3, 30}})
+s:replace{2, 3}
+s:select{}
+
+s:upsert({1, 1}, {{'=', 3, 30}})
+s:delete{1}
+s:select{}
+
+-- Invalid upsert in a sequence of upserts is skipped meanwhile
+-- the rest are applied.
+--
+box.snapshot()
+s:upsert({2, 2}, {{'+', 2, 5}})
+s:upsert({2, 2}, {{'=', 3, 40}})
+s:upsert({2, 2}, {{'+', 2, 5}})
+s:select{}
+box.snapshot()
+s:select{}
+
+s:drop()
+
+-- Test different scenarious during which update operations squash can't
+-- take place due to format violations.
+--
+decimal = require('decimal')
+
+s = box.schema.space.create('test', { engine = 'vinyl', field_count = 5 })
+s:format({{name='id', type='unsigned'}, {name='u', type='unsigned'},\
+          {name='s', type='scalar'}, {name='f', type='double'},\
+          {name='d', type='decimal'}})
+pk = s:create_index('pk')
+s:replace{1, 1, 1, 1.1, decimal.new(1.1) }
+s:replace{2, 1, 1, 1.1, decimal.new(1.1)}
+box.snapshot()
+-- Can't assign integer to float field. First operation is still applied.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 4, 4}})
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'=', 4, 4}})
+-- Can't add floating point to integer (result is floating point).
+--
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5}})
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5.5}})
+box.snapshot()
+s:select()
+-- Integer overflow check.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}})
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}})
+-- Negative result of subtraction stored in unsigned field.
+--
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 2}})
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, 10}})
+box.snapshot()
+s:select()
+-- Decimals do not fit into numerics and vice versa.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 5, 2}})
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 5, 1}})
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, decimal.new(2.1)}})
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, decimal.new(1.2)}})
+box.snapshot()
+s:select()
+
+s:drop()
+
+-- Upserts leading to overflow are ignored.
+--
+format = {}
+format[1] = {name = 'f1', type = 'unsigned'}
+format[2] = {name = 'f2', type = 'unsigned'}
+s = box.schema.space.create('test', {engine = 'vinyl', format = format})
+_ = s:create_index('pk')
+uint_max = 18446744073709551615ULL
+s:replace{1, uint_max - 2}
+box.snapshot()
+
+s:upsert({1, 0}, {{'+', 2, 1}})
+s:upsert({1, 0}, {{'+', 2, 1}})
+s:upsert({1, 0}, {{'+', 2, 1}})
+box.snapshot()
+s:select()
+
+s:delete{1}
+s:replace{1, uint_max - 2, 0}
+box.snapshot()
+
+s:upsert({1, 0, 0}, {{'+', 2, 1}})
+s:upsert({1, 0, 0}, {{'+', 2, 1}})
+s:upsert({1, 0, 0}, {{'+', 2, 0.5}})
+s:upsert({1, 0, 0}, {{'+', 2, 1}})
+box.snapshot()
+s:select()
+s:drop()
+
+-- Make sure upserts satisfy associativity rule.
+--
+s = box.schema.space.create('test', {engine='vinyl'})
+i = s:create_index('pk', {parts={2, 'uint'}})
+s:replace{1, 2, 3, 'default'}
+box.snapshot()
+
+s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}})
+-- Upsert will fail and thus ignored.
+--
+s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}})
+box.snapshot()
+
+s:select{}
+
+s:drop()
+
+-- Combination of upserts and underlying void (i.e. delete or null)
+-- statement on disk. Upsert modifying PK is skipped.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+i = s:create_index('test', { run_count_per_level = 20 })
+
+for i = 101, 110 do s:replace{i, i} end
+s:replace({1, 1})
+box.snapshot()
+s:delete({1})
+box.snapshot()
+s:upsert({1, 1}, {{'=', 2, 2}})
+s:upsert({1, 1}, {{'=', 1, 0}})
+box.snapshot()
+s:select()
+
+s:drop()
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+i = s:create_index('test', { run_count_per_level = 20 })
+
+for i = 101, 110 do s:replace{i, i} end
+box.snapshot()
+s:upsert({1, 1}, {{'=', 2, 2}})
+s:upsert({1, 1}, {{'=', 1, 0}})
+box.snapshot()
+s:select()
+
+s:drop()
diff --git a/test/vinyl/upsert_upgrade.result b/test/vinyl/upsert_upgrade.result
new file mode 100644
index 0000000000000000000000000000000000000000..8882a8b634cf3a1f677f602b22efb46f4a6d8661
--- /dev/null
+++ b/test/vinyl/upsert_upgrade.result
@@ -0,0 +1,59 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+-- Upsert's internal format have changed: now update operations are stored
+-- with additional map package. Let's test backward compatibility.
+-- Snapshot (i.e. run files) contain following statements:
+
+-- s = box.schema.create_space('test', {engine = 'vinyl'})
+-- pk = s:create_index('pk')
+-- s:insert({1, 2})
+-- box.snapshot()
+-- s:upsert({1, 0}, {{'+', 2, 1}})
+-- s:upsert({1, 0}, {{'-', 2, 2}})
+-- s:upsert({2, 0}, {{'+', 2, 1}})
+-- s:upsert({2, 0}, {{'-', 2, 2}})
+-- s:upsert({1, 0}, {{'=', 2, 2}})
+-- s:upsert({1, 0}, {{'-', 2, 2}})
+-- box.snapshot()
+--
+-- Make sure that upserts will be parsed and squashed correctly.
+--
+
+dst_dir = 'vinyl/upgrade/upsert/'
+ | ---
+ | ...
+
+test_run:cmd('create server upgrade with script="vinyl/upgrade.lua", workdir="' .. dst_dir .. '"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server upgrade')
+ | ---
+ | - true
+ | ...
+test_run:switch('upgrade')
+ | ---
+ | - true
+ | ...
+
+box.space.test:select()
+ | ---
+ | - - [1, 0]
+ |   - [2, -2]
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server upgrade')
+ | ---
+ | - true
+ | ...
+test_run:cmd('cleanup server upgrade')
+ | ---
+ | - true
+ | ...
diff --git a/test/vinyl/upsert_upgrade.test.lua b/test/vinyl/upsert_upgrade.test.lua
new file mode 100644
index 0000000000000000000000000000000000000000..db409e2dd7827b27965b03bbf5dd29df067d07eb
--- /dev/null
+++ b/test/vinyl/upsert_upgrade.test.lua
@@ -0,0 +1,32 @@
+test_run = require('test_run').new()
+
+-- Upsert's internal format have changed: now update operations are stored
+-- with additional map package. Let's test backward compatibility.
+-- Snapshot (i.e. run files) contain following statements:
+
+-- s = box.schema.create_space('test', {engine = 'vinyl'})
+-- pk = s:create_index('pk')
+-- s:insert({1, 2})
+-- box.snapshot()
+-- s:upsert({1, 0}, {{'+', 2, 1}})
+-- s:upsert({1, 0}, {{'-', 2, 2}})
+-- s:upsert({2, 0}, {{'+', 2, 1}})
+-- s:upsert({2, 0}, {{'-', 2, 2}})
+-- s:upsert({1, 0}, {{'=', 2, 2}})
+-- s:upsert({1, 0}, {{'-', 2, 2}})
+-- box.snapshot()
+--
+-- Make sure that upserts will be parsed and squashed correctly.
+--
+
+dst_dir = 'vinyl/upgrade/upsert/'
+
+test_run:cmd('create server upgrade with script="vinyl/upgrade.lua", workdir="' .. dst_dir .. '"')
+test_run:cmd('start server upgrade')
+test_run:switch('upgrade')
+
+box.space.test:select()
+
+test_run:switch('default')
+test_run:cmd('stop server upgrade')
+test_run:cmd('cleanup server upgrade')