diff --git a/src/traft/node.rs b/src/traft/node.rs index 3670f2b7b81809c5829ae13ae3742974808ad3a8..2fe4a36c9d2ec83ad5f94f2a8ff7484d423432f2 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -166,7 +166,7 @@ impl Node { raft_id, main_loop: MainLoop::start(status.clone(), node_impl.clone()), // yields _conf_change_loop: fiber::Builder::new() - .name("raft_conf_change_loop") + .name("governor_loop") .proc(conf_change_loop_fn) .start() .unwrap(), @@ -947,12 +947,10 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { // will sometimes be handled and there's no need in timeout. // It also guarantees that the notification will arrive only // after the node leaves the joint state. - match node.propose_conf_change_and_wait(term, conf_change) { - Ok(()) => tlog!(Info, "conf_change processed"), - Err(e) => { - tlog!(Warning, "conf_change failed: {e}"); - fiber::sleep(Duration::from_secs(1)); - } + tlog!(Info, "proposing conf_change"; "cc" => ?conf_change); + if let Err(e) = node.propose_conf_change_and_wait(term, conf_change) { + tlog!(Warning, "failed proposing conf_change: {e}"); + fiber::sleep(Duration::from_secs(1)); } continue 'governor; } @@ -965,6 +963,14 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { // TODO: process them all, not just the first one .find(|peer| peer.target_grade == TargetGradeVariant::Offline); if let Some(peer) = to_offline { + tlog!( + Info, + "processing {} {} -> {}", + peer.instance_id, + peer.current_grade, + peer.target_grade + ); + let replicaset_id = &peer.replicaset_id; let res = (|| -> traft::Result<_> { let replicaset = storage.replicasets.get(replicaset_id)?; @@ -977,17 +983,14 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { if let Some(peer) = new_master { let mut ops = UpdateOps::new(); ops.assign("master_id", &peer.instance_id)?; - node.propose_and_wait( - OpDML::update(ClusterSpace::Replicasets, &[replicaset_id], ops)?, - // TODO: don't hard code the timeout - Duration::from_secs(3), - // TODO: these `?` will be processed in the wrong place - )??; + + let op = OpDML::update(ClusterSpace::Replicasets, &[replicaset_id], ops)?; + tlog!(Info, "proposing replicaset master change"; "op" => ?op); + // TODO: don't hard code the timeout + // TODO: these `?` will be processed in the wrong place + node.propose_and_wait(op, Duration::from_secs(3))??; } else { - tlog!(Warning, "the last replica has gone offline"; - "replicaset_id" => %replicaset_id, - "instance_id" => %peer.instance_id, - ); + tlog!(Info, "skip proposing replicaset master change"); } } @@ -997,6 +1000,10 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { || peer.current_grade == CurrentGradeVariant::Online }) .map(|peer| { + tlog!(Info, + "calling rpc::sharding"; + "instance_id" => %peer.instance_id + ); ( peer.instance_id.clone(), sharding::Request { @@ -1009,29 +1016,29 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { }); // TODO: don't hard code timeout let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; - for (peer_iid, resp) in res { + for (_, resp) in res { let sharding::Response {} = resp?; - // TODO: change `Info` to `Debug` - tlog!(Info, "sharding reconfigured on peer"; - "instance_id" => &*peer_iid, - ); } Ok(()) })(); if let Err(e) = res { - tlog!(Warning, "failed to reconfigure sharding: {e}"); + tlog!(Warning, "failed calling rpc::sharding: {e}"); // TODO: don't hard code timeout event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); continue 'governor; } - let instance_id = peer.instance_id.clone(); - let req = UpdatePeerRequest::new(instance_id, cluster_id.clone()) + let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id.clone()) .with_current_grade(CurrentGrade::offline(peer.target_grade.incarnation)); - let res = node.handle_topology_request_and_wait(req.into()); - if let Err(e) = res { - tlog!(Warning, "failed to set peer offline: {e}"; - "instance_id" => &*peer.instance_id, + tlog!(Info, + "handling UpdatePeerRequest"; + "current_grade" => %req.current_grade.expect("just set"), + "instance_id" => %req.instance_id, + ); + if let Err(e) = node.handle_topology_request_and_wait(req.into()) { + tlog!(Warning, + "failed handling UpdatePeerRequest: {e}"; + "instance_id" => %peer.instance_id, ); // TODO: don't hard code timeout event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); @@ -1048,7 +1055,8 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { // Check if it makes sense to call box.ctl.promote, // otherwise we risk unpredictable delays if replicaset_peers.len() / 2 + 1 > may_respond.count() { - tlog!(Critical, "replicaset lost quorum"; + tlog!(Warning, + "replicaset lost quorum"; "replicaset_id" => %replicaset_id, ); continue 'governor; @@ -1058,6 +1066,10 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { // Promote the replication leader again // because of tarantool bugs if let Some(replicaset) = storage.replicasets.get(replicaset_id)? { + tlog!(Info, + "calling rpc::replication::promote"; + "instance_id" => %replicaset.master_id + ); pool.call_and_wait_timeout( &replicaset.master_id, replication::promote::Request { @@ -1068,15 +1080,12 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { // TODO: don't hard code timeout Duration::from_secs(3), )?; - tlog!(Debug, "promoted replicaset master"; - "instance_id" => %replicaset.master_id, - "replicaset_id" => %replicaset_id, - ); } Ok(()) })(); if let Err(e) = res { - tlog!(Warning, "failed to promote replicaset master: {e}"; + tlog!(Warning, + "failed calling rpc::replication::promote: {e}"; "replicaset_id" => %replicaset_id, ); } @@ -1124,8 +1133,8 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { } Err(e) => { tlog!(Warning, "raft sync failed: {e}"; - "instance_id" => &*peer.instance_id, - "peer" => &peer.peer_address, + "instance_id" => %peer.instance_id, + "address" => %peer.peer_address, ); // TODO: don't hard code timeout