diff --git a/src/args.rs b/src/args.rs index d6a65ca058bd44db741201d85526edfc0968527f..16d7cfbafa0a0cef0100f75b718331a713695664 100644 --- a/src/args.rs +++ b/src/args.rs @@ -114,6 +114,13 @@ impl Run { Ok(res) } + + pub fn advertise_address(&self) -> String { + match &self.advertise_address { + Some(v) => v.clone(), + None => self.listen.clone(), + } + } } //////////////////////////////////////////////////////////////////////////////// @@ -147,6 +154,12 @@ pub struct Test { pub run: Run, } +impl Test { + pub fn tt_args(&self) -> Result<Vec<CString>, String> { + Ok(vec![current_exe()?]) + } +} + //////////////////////////////////////////////////////////////////////////////// // fns //////////////////////////////////////////////////////////////////////////////// diff --git a/src/discovery.rs b/src/discovery.rs index a756c5bd267f59a4a3a524fe380b9042c739e588..a35640bb5b0b33d3334a7583fcf0f52661199928 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -1,122 +1,15 @@ -use std::{ - collections::BTreeSet, - fmt::{Debug, Display}, - net::{SocketAddr, ToSocketAddrs}, - time::Duration, - vec, -}; - +use ::tarantool::fiber::{mutex::MutexGuard, Mutex}; +use ::tarantool::proc; +use ::tarantool::uuid::Uuid; use itertools::Itertools; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use tarantool::{ - fiber::{self, mutex::MutexGuard, Mutex}, - net_box, - tuple::AsTuple, - uuid::Uuid, -}; - -use crate::tlog; - -pub fn net_box_repeat_call_until_succeed<Args, Res, Addr>( - address: Addr, - fn_name: &str, - args: Args, -) -> Res -where - Args: AsTuple, - Addr: ToSocketAddrs + Display, - Res: DeserializeOwned, -{ - loop { - let conn = match net_box::Conn::new( - &address, - net_box::ConnOptions { - connect_timeout: Duration::from_secs(2), - ..Default::default() - }, - None, - ) { - Ok(conn) => conn, - Err(e) => { - tlog!(Warning, "could not connect to {}: {}", address, e); - fiber::sleep(Duration::from_secs(2)); - continue; - } - }; - match conn.call( - fn_name, - &args, - &net_box::Options { - timeout: Some(Duration::from_secs(2)), - ..Default::default() - }, - ) { - Ok(Some(tuple)) => break tuple.into_struct::<((Res,),)>().unwrap().0 .0, - Ok(None) => unreachable!(), - Err(e) => { - tlog!( - Warning, - "net.box call failed address={address} fn={fn_name}: {e}" - ); - fiber::sleep(Duration::from_secs(2)) - } - } - } -} +use serde::{Deserialize, Serialize}; +use std::collections::BTreeSet; +use std::error::Error as StdError; -static mut DISCOVERY: &Option<Mutex<Discovery>> = &None; - -pub fn handle_request(request: Request, request_to: &Address) -> Response { - Discovery::handle_request(&mut discovery(), request, request_to) -} +use crate::stringify_cfunc; +use crate::tarantool; -fn set_discovery(d: Discovery) { - unsafe { DISCOVERY = Box::leak(Box::new(Some(Mutex::new(d)))) } -} - -fn discovery() -> MutexGuard<'static, Discovery> { - unsafe { DISCOVERY } - .as_ref() - .expect("discovery error: expected DISCOVERY to be set on instance startup") - .lock() -} - -#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord, Clone)] -pub struct Address { - pub host: String, - pub port: u16, -} - -impl<S> From<S> for Address -where - S: AsRef<str>, -{ - fn from(s: S) -> Self { - let (host, port_str) = s.as_ref().split_once(":").unwrap(); - Self { - host: host.into(), - port: port_str.parse().unwrap(), - } - } -} - -impl Display for Address { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}:{}", self.host, self.port) - } -} -impl Debug for Address { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Display::fmt(self, f) - } -} - -impl ToSocketAddrs for Address { - type Iter = vec::IntoIter<SocketAddr>; - fn to_socket_addrs(&self) -> std::io::Result<vec::IntoIter<SocketAddr>> { - format!("{}", self).to_socket_addrs() - } -} +type Address = String; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum Role { @@ -294,12 +187,38 @@ impl Discovery { } } -pub fn discover( +// TODO ÐœÑƒÑ‚ÐµÐºÑ Ð·Ð´ÐµÑÑŒ не нужен, пуÑÑ‚ÑŒ даже он тарантульный. +// ЗдеÑÑŒ доÑтаточно проÑто +// static mut RAFT_DISCOVERY: Option<&'static Discovery> = None; +// ÐœÑƒÑ‚ÐµÐºÑ - Ñто потенциальный йилд, но алгоритм диÑкавери не предполагает +// никаких йилдов. Обработка запроÑов и ответов и так должна быть атомарной. +// ЕÑли же Ñто не так - то Ñто некорректный алгоритм, +// а Ð¼ÑƒÑ‚ÐµÐºÑ - не больше чем попытка замеÑти грÑзь под ковёр. +static mut DISCOVERY: &Option<Mutex<Discovery>> = &None; + +fn discovery() -> MutexGuard<'static, Discovery> { + unsafe { DISCOVERY } + .as_ref() + .expect("discovery error: expected DISCOVERY to be set on instance startup") + .lock() +} + +pub fn init_global( peers: impl IntoIterator<Item = impl Into<Address>>, - make_request: impl Fn(Request, &Address) -> Response, -) -> Role { + // make_request: impl Fn(Request, &Address) -> Response, +) { + // make_request = fn rpc_discover( + // request: discovery::Request, + // address: &discovery::Address, + // ) -> discovery::Response { + // net_box_repeat_call_until_succeed(address, ".discover", (request, address)) + // } + let d = Discovery::new(Uuid::random().to_string(), peers); - set_discovery(d); + unsafe { DISCOVERY = Box::leak(Box::new(Some(Mutex::new(d)))) } +} + +pub fn wait_global() -> Role { loop { let mut d = discovery(); if let State::Done(role) = &d.state { @@ -307,13 +226,20 @@ pub fn discover( } let step = d.next(); drop(d); // release the lock before doing i/o - if let Some((request, address)) = step { - let response = make_request(request, &address); + if let Some((request, address)) = &step { + let fn_name = stringify_cfunc!(proc_discover); + let response = tarantool::net_box_call_retry(address, fn_name, &(request, address)); discovery().handle_response(response); } } } +#[proc] +fn proc_discover(request: Request, request_to: Address) -> Result<Response, Box<dyn StdError>> { + let mut discovery = discovery(); + Ok(discovery.handle_request(request, &request_to)) +} + #[cfg(test)] mod tests { use std::collections::{BTreeMap, HashMap}; diff --git a/src/main.rs b/src/main.rs index c175636d916cc989397acfec2703c5b5e062c4fa..b51752d3f4ed4f62a6aea72a4ee9c76da5ae07ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -124,8 +124,8 @@ fn init_handlers() { "#, ); - use discovery::raft_discover; - declare_cfunc!(raft_discover); + use discovery::proc_discover; + declare_cfunc!(proc_discover); use traft::node::raft_interact; declare_cfunc!(raft_interact); @@ -309,41 +309,41 @@ fn start_discover(supervisor: Supervisor) { tarantool::set_cfg(&cfg); traft::Storage::init_schema(); - discovery::init_global( - discovery::PeerInfo { - raft_id: traft::Storage::id().unwrap(), - random_uuid: tarantool::info("uuid").unwrap(), - instance_id: args.instance_id.clone(), - replicaset_id: args.replicaset_id.clone(), - advertise_address: args.advertise_address(), - }, - &args.peers, - ); + + // иÑходный discovery::discover() пришлоÑÑŒ разбить на две чаÑти - + // init_global и wait_global. К Ñожалению, они не могут быть атомарны, + // потому что listen порт надо поднимать именно поÑередине. С неподнÑтым портом + // уходить в кишки discovery::discover() Ð½ÐµÐ»ÑŒÐ·Ñ - на запроÑÑ‹ отвечать будет некому. + // РеÑли поднÑÑ‚ÑŒ порт до инициализации диÑкавери, то образуетÑÑ Ð²Ñ€ÐµÐ¼ÐµÐ½Ð½Ð¾Ìе окно, + // и прилетевший пакет приведёт к панике "discovery error: expected DISCOVERY + // to be set on instance startup" + discovery::init_global(&args.peers); init_handlers(); cfg.listen = Some(args.listen.clone()); tarantool::set_cfg(&cfg); - let summary = discovery::wait_global().unwrap(); + let role = discovery::wait_global(); // TODO assert traft::Storage::instance_id == (null || args.instance_id) if let Some(_) = traft::Storage::id().unwrap() { return postjoin(supervisor); } - let msg = if summary.its_me { - return start_boot(supervisor); - // let next_entrypoint = Entrypoint::StartBoot(); - // IpcMessage { - // next_entrypoint, - // drop_db: false, - // } - } else { - let next_entrypoint = Entrypoint::StartJoin { - leader_uri: summary.leader.unwrap().advertise_address, - }; - IpcMessage { - next_entrypoint, - drop_db: true, + let msg = match role { + discovery::Role::Leader { .. } => { + return start_boot(supervisor); + // let next_entrypoint = Entrypoint::StartBoot(); + // IpcMessage { + // next_entrypoint, + // drop_db: false, + // } + } + discovery::Role::NonLeader { leader } => { + let next_entrypoint = Entrypoint::StartJoin { leader_uri: leader }; + IpcMessage { + next_entrypoint, + drop_db: true, + } } }; @@ -436,7 +436,7 @@ fn start_join(leader_uri: String, supervisor: Supervisor) { let fn_name = stringify_cfunc!(raft_join); let timeout = Duration::from_secs_f32(1.5); let resp: traft::node::JoinResponse = - tarantool::net_box_call(&leader_uri, fn_name, req, timeout).unwrap_or_else(|e| { + tarantool::net_box_call(&leader_uri, fn_name, &req, timeout).unwrap_or_else(|e| { tlog!(Warning, "net_box_call failed: {e}"; "peer" => &leader_uri, "fn" => fn_name, @@ -556,7 +556,7 @@ fn postjoin(supervisor: Supervisor) { use traft::node::raft_join; let fn_name = stringify_cfunc!(raft_join); let now = Instant::now(); - match tarantool::net_box_call(&leader.peer_address, fn_name, req, timeout) { + match tarantool::net_box_call(&leader.peer_address, fn_name, &req, timeout) { Err(e) => { tlog!(Error, "failed to promote myself: {e}"); fiber::sleep(timeout.saturating_sub(now.elapsed())); diff --git a/src/tarantool.rs b/src/tarantool.rs index 0de46cb03533fdbd3c36f7e8cf6bf5ef5d35ef10..62b6788a35c98d02c4b3c2f819b49fe0c4e65401 100644 --- a/src/tarantool.rs +++ b/src/tarantool.rs @@ -2,6 +2,7 @@ use std::ffi::CStr; use std::time::Duration; use std::time::Instant; +use ::tarantool::fiber; use ::tarantool::lua_state; use ::tarantool::net_box; use ::tarantool::tlua::{self, LuaFunction, LuaTable}; @@ -138,13 +139,6 @@ pub fn cfg() -> Option<Cfg> { b.get("cfg") } -pub fn info(k: &str) -> Option<String> { - let l = lua_state(); - let b: LuaTable<_> = l.get("box")?; - let info: LuaTable<_> = b.get("info").unwrap(); - info.get(k) -} - pub fn set_cfg(cfg: &Cfg) { let l = lua_state(); let box_cfg = LuaFunction::load(l, "return box.cfg(...)").unwrap(); @@ -160,7 +154,7 @@ pub fn eval(code: &str) { pub fn net_box_call<Args, Res, Addr>( address: Addr, fn_name: &str, - args: Args, + args: &Args, timeout: Duration, ) -> Result<Res, ::tarantool::error::Error> where @@ -183,8 +177,27 @@ where }; let tuple = conn - .call(fn_name, &args, &call_opts)? + .call(fn_name, args, &call_opts)? .expect("unexpected net_box result Ok(None)"); tuple.into_struct::<((Res,),)>().map(|res| res.0 .0) } + +pub fn net_box_call_retry<Args, Res, Addr>(address: Addr, fn_name: &str, args: &Args) -> Res +where + Args: AsTuple, + Addr: std::net::ToSocketAddrs + std::fmt::Display, + Res: serde::de::DeserializeOwned, +{ + loop { + let timeout = Duration::from_millis(200); + let now = Instant::now(); + match net_box_call(&address, fn_name, args, timeout) { + Ok(v) => break v, + Err(e) => { + crate::tlog!(Warning, "could not connect to {}: {}", address, e); + fiber::sleep(timeout.saturating_sub(now.elapsed())) + } + } + } +} diff --git a/test/helper/picodata.lua b/test/helper/picodata.lua index 11841b8abb4ca772701a745071dc77392cce837f..4bd355d1484262350aaffddcc6329dd2ea6f4b47 100644 --- a/test/helper/picodata.lua +++ b/test/helper/picodata.lua @@ -12,7 +12,7 @@ local Picodata = { name = 'default', listen = '127.0.0.1:13301', peer = {'127.0.0.1:13301'}, - args = {'run', '--instance-id', 'i1'}, + args = {'run'}, env = {}, command = 'target/debug/picodata',