diff --git a/src/box/applier.cc b/src/box/applier.cc index 512d05dfa1df4149d65fa141fed94126cf650231..7f37fe2ee02b8db7c9e92de6d07366e386db937a 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -530,34 +530,18 @@ applier_subscribe(struct applier *applier) applier->lag = ev_now(loop()) - row.tm; applier->last_row_time = ev_monotonic_now(loop()); - + struct replica *replica = replica_by_id(row.replica_id); + struct latch *latch = (replica ? &replica->order_latch : + &replicaset.applier.order_latch); + /* + * In a full mesh topology, the same set of changes + * may arrive via two concurrently running appliers. + * Hence we need a latch to strictly order all changes + * that belong to the same server id. + */ + latch_lock(latch); if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - /** - * Promote the replica set vclock before - * applying the row. If there is an - * exception (conflict) applying the row, - * the row is skipped when the replication - * is resumed. - */ - vclock_follow_xrow(&replicaset.vclock, &row); - struct replica *replica = replica_by_id(row.replica_id); - struct latch *latch = (replica ? &replica->order_latch : - &replicaset.applier.order_latch); - /* - * In a full mesh topology, the same set - * of changes may arrive via two - * concurrently running appliers. Thanks - * to vclock_follow() above, the first row - * in the set will be skipped - but the - * remaining may execute out of order, - * when the following xstream_write() - * yields on WAL. Hence we need a latch to - * strictly order all changes which belong - * to the same server id. - */ - latch_lock(latch); int res = xstream_write(applier->subscribe_stream, &row); - latch_unlock(latch); if (res != 0) { struct error *e = diag_last_error(diag_get()); /** @@ -568,10 +552,14 @@ applier_subscribe(struct applier *applier) box_error_code(e) == ER_TUPLE_FOUND && replication_skip_conflict) diag_clear(diag_get()); - else + else { + latch_unlock(latch); diag_raise(); + } } } + latch_unlock(latch); + if (applier->state == APPLIER_SYNC || applier->state == APPLIER_FOLLOW) fiber_cond_signal(&applier->writer_cond); diff --git a/src/box/wal.c b/src/box/wal.c index 7b09a032acba8390735d98b4f6fadd4e08712a04..cdcaabc0089361bf4156c549933fa15d6f1863fa 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -171,6 +171,8 @@ struct wal_msg { * be rolled back. */ struct stailq rollback; + /** vclock after the batch processed. */ + struct vclock vclock; }; /** @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch) batch->approx_len = 0; stailq_create(&batch->commit); stailq_create(&batch->rollback); + vclock_create(&batch->vclock); } static struct wal_msg * @@ -284,6 +287,8 @@ tx_schedule_commit(struct cmsg *msg) /* Closes the input valve. */ stailq_concat(&writer->rollback, &batch->rollback); } + /* Update the tx vclock to the latest written by wal. */ + vclock_copy(&replicaset.vclock, &batch->vclock); tx_schedule_queue(&batch->commit); } @@ -1023,6 +1028,12 @@ wal_write_to_disk(struct cmsg *msg) error_log(error); diag_clear(diag_get()); } + /* + * Remember the vclock of the last successfully written row so + * that we can update replicaset.vclock once this message gets + * back to tx. + */ + vclock_copy(&wal_msg->vclock, &writer->vclock); /* * We need to start rollback from the first request * following the last committed request. If @@ -1154,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry) bool cancellable = fiber_set_cancellable(false); fiber_yield(); /* Request was inserted. */ fiber_set_cancellable(cancellable); - if (entry->res > 0) { - struct xrow_header **last = entry->rows + entry->n_rows - 1; - while (last >= entry->rows) { - /* - * Find last row from local instance id - * and promote vclock. - */ - if ((*last)->replica_id == instance_id) { - /* - * In master-master configuration, during sudden - * power-loss, if the data have not been written - * to WAL but have already been sent to others, - * they will send the data back. In this case - * vclock has already been promoted by applier. - */ - if (vclock_get(&replicaset.vclock, - instance_id) < (*last)->lsn) { - vclock_follow_xrow(&replicaset.vclock, - *last); - } - break; - } - --last; - } - } return entry->res; } @@ -1188,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal, { struct wal_writer *writer = (struct wal_writer *) journal; wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows); - int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id); - int64_t new_lsn = vclock_get(&writer->vclock, instance_id); - if (new_lsn > old_lsn) { - /* There were local writes, promote vclock. */ - vclock_follow(&replicaset.vclock, instance_id, new_lsn); - } + vclock_copy(&replicaset.vclock, &writer->vclock); return vclock_sum(&writer->vclock); } diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result index 6ca13b4724b94802d7e2d3fd4ca3b8d0917849af..bcbbbcc34e46169a3f361370d69ece958c6dbc3b 100644 --- a/test/replication/skip_conflict_row.result +++ b/test/replication/skip_conflict_row.result @@ -82,6 +82,69 @@ box.info.status --- - running ... +-- gh-2283: test that if replication_skip_conflict is off vclock +-- is not advanced on errors. +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:insert{3} +--- +- [3] +... +lsn1 = box.info.vclock[1] +--- +... +test_run:cmd("switch default") +--- +- true +... +box.space.test:insert{3, 3} +--- +- [3, 3] +... +box.space.test:insert{4} +--- +- [4] +... +test_run:cmd("switch replica") +--- +- true +... +-- lsn is not promoted +lsn1 == box.info.vclock[1] +--- +- true +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'primary' in space 'test' +... +box.info.replication[1].upstream.status +--- +- stopped +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +-- applier is not in follow state +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'primary' in space 'test' +... +test_run:cmd("switch default") +--- +- true +... -- cleanup test_run:cmd("stop server replica") --- diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua index 4406ced957ea3e88b7e85ae57f4f077eaee78491..3a9076b397d3010848e1b3e9948e9982fb48be5a 100644 --- a/test/replication/skip_conflict_row.test.lua +++ b/test/replication/skip_conflict_row.test.lua @@ -28,6 +28,26 @@ box.space.test:select() test_run:cmd("switch default") box.info.status +-- gh-2283: test that if replication_skip_conflict is off vclock +-- is not advanced on errors. +test_run:cmd("restart server replica") +test_run:cmd("switch replica") +box.space.test:insert{3} +lsn1 = box.info.vclock[1] +test_run:cmd("switch default") +box.space.test:insert{3, 3} +box.space.test:insert{4} +test_run:cmd("switch replica") +-- lsn is not promoted +lsn1 == box.info.vclock[1] +box.info.replication[1].upstream.message +box.info.replication[1].upstream.status +test_run:cmd("switch default") +test_run:cmd("restart server replica") +-- applier is not in follow state +box.info.replication[1].upstream.message +test_run:cmd("switch default") + -- cleanup test_run:cmd("stop server replica") test_run:cmd("cleanup server replica")