From f5389a8bd195c6fbb67efd0ee6eb053de5721517 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 29 Mar 2024 18:18:02 +0300
Subject: [PATCH] fix: initialize replicaset atomically with the first joined
 instance

---
 src/bootstrap_entries.rs |  8 +++++
 src/governor/mod.rs      | 29 ----------------
 src/governor/plan.rs     | 43 -----------------------
 src/replicaset.rs        | 15 ++++++++
 src/rpc/join.rs          | 75 +++++++++++++++++++++++-----------------
 test/int/test_basics.py  |  2 +-
 6 files changed, 67 insertions(+), 105 deletions(-)

diff --git a/src/bootstrap_entries.rs b/src/bootstrap_entries.rs
index a455b9c3fe..2042d96f2f 100644
--- a/src/bootstrap_entries.rs
+++ b/src/bootstrap_entries.rs
@@ -3,6 +3,7 @@ use protobuf::Message;
 
 use crate::config::PicodataConfig;
 use crate::instance::Instance;
+use crate::replicaset::Replicaset;
 use crate::schema;
 use crate::schema::ADMIN_ID;
 use crate::sql::pgproto;
@@ -39,6 +40,7 @@ pub(super) fn prepare(
     //
     // Populate "_pico_address" and "_pico_instance" with info about the first instance
     //
+    // TODO: these could all go into a single op::BatchDml
     init_entries_push_op(op::Dml::replace(
         ClusterwideTable::Address,
         &traft::PeerAddress {
@@ -52,6 +54,12 @@ pub(super) fn prepare(
         &instance,
         ADMIN_ID,
     ));
+    let replicaset = Replicaset::with_one_instance(instance);
+    init_entries_push_op(op::Dml::insert(
+        ClusterwideTable::Replicaset,
+        &replicaset,
+        ADMIN_ID,
+    ));
 
     //
     // Populate "_pico_tier" with initial tiers
diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index cb2b3de0a1..80d6b6337d 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -249,35 +249,6 @@ impl Loop {
                 }
             }
 
-            Plan::CreateReplicaset(CreateReplicaset {
-                master_id,
-                replicaset_id,
-                rpc,
-                op,
-            }) => {
-                set_status(governor_status, "create new replicaset");
-                governor_step! {
-                    "promoting new replicaset master" [
-                        "master_id" => %master_id,
-                        "replicaset_id" => %replicaset_id,
-                    ]
-                    async {
-                        pool.call(master_id, &rpc, Self::RPC_TIMEOUT)?
-                            .timeout(Duration::from_secs(3))
-                            .await?
-                    }
-                }
-
-                governor_step! {
-                    "creating new replicaset" [
-                        "replicaset_id" => %replicaset_id,
-                    ]
-                    async {
-                        node.propose_and_wait(op, Duration::from_secs(3))?;
-                    }
-                }
-            }
-
             Plan::Replication(Replication {
                 targets,
                 master_id,
diff --git a/src/governor/plan.rs b/src/governor/plan.rs
index f0dd655850..1da5cf1c8a 100644
--- a/src/governor/plan.rs
+++ b/src/governor/plan.rs
@@ -101,42 +101,6 @@ pub(super) fn action_plan<'i>(
         return Ok(Downgrade { req }.into());
     }
 
-    ////////////////////////////////////////////////////////////////////////////
-    // create new replicaset
-    let to_create_replicaset = instances
-        .iter()
-        .filter(|instance| has_grades!(instance, Offline -> Online) || instance.is_reincarnated())
-        .find(|instance| replicasets.get(&instance.replicaset_id).is_none());
-    if let Some(Instance {
-        instance_id: master_id,
-        replicaset_id,
-        replicaset_uuid,
-        tier,
-        ..
-    }) = to_create_replicaset
-    {
-        let rpc = rpc::replication::SyncAndPromoteRequest {
-            vclock: None,
-            timeout: Loop::SYNC_TIMEOUT,
-        };
-        let op = Dml::insert(
-            ClusterwideTable::Replicaset,
-            &Replicaset {
-                replicaset_id: replicaset_id.clone(),
-                replicaset_uuid: replicaset_uuid.clone(),
-                current_master_id: master_id.clone(),
-                target_master_id: master_id.clone(),
-                weight: 0.,
-                weight_origin: WeightOrigin::Auto,
-                state: ReplicasetState::NotReady,
-                tier: tier.clone(),
-            },
-            ADMIN_ID,
-        )?;
-        #[rustfmt::skip]
-        return Ok(CreateReplicaset { master_id, replicaset_id, rpc, op }.into());
-    }
-
     ////////////////////////////////////////////////////////////////////////////
     // update target replicaset master
     let new_target_master = get_new_replicaset_master_if_needed(instances, replicasets);
@@ -453,13 +417,6 @@ pub mod stage {
             pub req: rpc::update_instance::Request,
         }
 
-        pub struct CreateReplicaset<'i> {
-            pub master_id: &'i InstanceId,
-            pub replicaset_id: &'i ReplicasetId,
-            pub rpc: rpc::replication::SyncAndPromoteRequest,
-            pub op: Dml,
-        }
-
         pub struct Replication<'i> {
             pub targets: Vec<&'i InstanceId>,
             pub master_id: &'i InstanceId,
diff --git a/src/replicaset.rs b/src/replicaset.rs
index 9a1f4fe425..3fcf3691e0 100644
--- a/src/replicaset.rs
+++ b/src/replicaset.rs
@@ -1,4 +1,5 @@
 use super::instance::InstanceId;
+use crate::instance::Instance;
 use ::tarantool::tlua;
 use ::tarantool::tuple::Encode;
 
@@ -49,6 +50,20 @@ pub struct Replicaset {
 impl Encode for Replicaset {}
 
 impl Replicaset {
+    #[inline]
+    pub fn with_one_instance(master: &Instance) -> Replicaset {
+        Replicaset {
+            replicaset_id: master.replicaset_id.clone(),
+            replicaset_uuid: master.replicaset_uuid.clone(),
+            current_master_id: master.instance_id.clone(),
+            target_master_id: master.instance_id.clone(),
+            weight: 0.,
+            weight_origin: WeightOrigin::Auto,
+            state: ReplicasetState::NotReady,
+            tier: master.tier.clone(),
+        }
+    }
+
     /// Format of the _pico_replicaset global table.
     #[inline(always)]
     pub fn format() -> Vec<::tarantool::space::Field> {
diff --git a/src/rpc/join.rs b/src/rpc/join.rs
index aba562abbf..2c7630c593 100644
--- a/src/rpc/join.rs
+++ b/src/rpc/join.rs
@@ -7,6 +7,7 @@ use crate::has_grades;
 use crate::instance::Grade;
 use crate::instance::GradeVariant::*;
 use crate::instance::{Instance, InstanceId};
+use crate::replicaset::Replicaset;
 use crate::replicaset::ReplicasetId;
 use crate::schema::ADMIN_ID;
 use crate::storage::ClusterwideTable;
@@ -92,43 +93,34 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
             raft_id: instance.raft_id,
             address: req.advertise_address.clone(),
         };
-        let op_addr = Dml::replace(ClusterwideTable::Address, &peer_address, ADMIN_ID)
-            .expect("encoding should not fail");
-        let op_instance = Dml::replace(ClusterwideTable::Instance, &instance, ADMIN_ID)
-            .expect("encoding should not fail");
+
+        let mut ops = Vec::with_capacity(3);
+        ops.push(
+            Dml::replace(ClusterwideTable::Address, &peer_address, ADMIN_ID)
+                .expect("encoding should not fail"),
+        );
+        ops.push(
+            Dml::replace(ClusterwideTable::Instance, &instance, ADMIN_ID)
+                .expect("encoding should not fail"),
+        );
+
+        if storage.replicasets.get(&instance.replicaset_id)?.is_none() {
+            let replicaset = Replicaset::with_one_instance(&instance);
+            ops.push(
+                Dml::insert(ClusterwideTable::Replicaset, &replicaset, ADMIN_ID)
+                    .expect("encoding should not fail"),
+            );
+        }
+
         let ranges = vec![
             cas::Range::new(ClusterwideTable::Instance),
             cas::Range::new(ClusterwideTable::Address),
             cas::Range::new(ClusterwideTable::Tier),
+            cas::Range::new(ClusterwideTable::Replicaset),
         ];
-        macro_rules! handle_result {
-            ($res:expr) => {
-                match $res {
-                    Ok((index, term)) => {
-                        node.wait_index(index, deadline.duration_since(fiber::clock()))?;
-                        if term != raft::Storage::term(raft_storage, index)? {
-                            // leader switched - retry
-                            node.wait_status();
-                            continue;
-                        }
-                    }
-                    Err(err) => {
-                        if err.is_cas_err() | err.is_term_mismatch_err() {
-                            // cas error - retry
-                            fiber::sleep(Duration::from_millis(500));
-                            continue;
-                        } else {
-                            return Err(err);
-                        }
-                    }
-                }
-            };
-        }
         // Only in this order - so that when instance exists - address will always be there.
-        handle_result!(cas::compare_and_swap(
-            Op::BatchDml {
-                ops: vec![op_addr, op_instance],
-            },
+        let res = cas::compare_and_swap(
+            Op::BatchDml { ops },
             cas::Predicate {
                 index: raft_storage.applied()?,
                 term: raft_storage.term()?,
@@ -136,7 +128,26 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
             },
             ADMIN_ID,
             deadline.duration_since(fiber::clock()),
-        ));
+        );
+        match res {
+            Ok((index, term)) => {
+                node.wait_index(index, deadline.duration_since(fiber::clock()))?;
+                if term != raft::Storage::term(raft_storage, index)? {
+                    // leader switched - retry
+                    node.wait_status();
+                    continue;
+                }
+            }
+            Err(err) => {
+                if err.is_cas_err() | err.is_term_mismatch_err() {
+                    // cas error - retry
+                    fiber::sleep(Duration::from_millis(500));
+                    continue;
+                } else {
+                    return Err(err);
+                }
+            }
+        }
         node.main_loop.wakeup();
 
         // A joined instance needs to communicate with other nodes.
diff --git a/test/int/test_basics.py b/test/int/test_basics.py
index 421b40c63e..2bb51143b4 100644
--- a/test/int/test_basics.py
+++ b/test/int/test_basics.py
@@ -335,6 +335,7 @@ def test_raft_log(instance: Instance):
 +-----+----+--------+
 |  0  | 1  |Replace({_pico_peer_address}, [1,"127.0.0.1:{p}"])|
 |  0  | 1  |Insert({_pico_instance}, ["i1","68d4a766-4144-3248-aeb4-e212356716e4",1,"r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07",["Offline",0],["Offline",0],{b},"default"])|
+|  0  | 1  |Insert({_pico_replicaset}, ["r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07","i1","i1","default",0.0,"auto","not-ready"])|
 |  0  | 1  |Insert({_pico_tier}, ["default",1])|
 |  0  | 1  |Insert({_pico_property}, ["global_schema_version",0])|
 |  0  | 1  |Insert({_pico_property}, ["next_schema_version",1])|
@@ -401,7 +402,6 @@ def test_raft_log(instance: Instance):
 |  0  | 1  |AddNode(1)|
 |  0  | 2  |-|
 |  0  | 2  |Replace({_pico_instance}, ["i1","68d4a766-4144-3248-aeb4-e212356716e4",1,"r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07",["Offline",0],["Online",1],{b},"default"])|
-|  0  | 2  |Insert({_pico_replicaset}, ["r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07","i1","i1","default",0.0,"auto","not-ready"])|
 |  0  | 2  |Replace({_pico_instance}, ["i1","68d4a766-4144-3248-aeb4-e212356716e4",1,"r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07",["Replicated",1],["Online",1],{b},"default"])|
 |  0  | 2  |Update({_pico_replicaset}, ["r1"], [["=","weight",1.0], ["=","state","ready"]])|
 |  0  | 2  |Replace({_pico_property}, ["target_vshard_config",[{{"e0df68c5-e7f9-395f-86b3-30ad9e1b7b07":[{{"68d4a766-4144-3248-aeb4-e212356716e4":["pico_service@127.0.0.1:{p}","i1",true]}},1.0]}},"on"]])|
-- 
GitLab