Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/tarantool-module
1 result
Show changes
Commits on Source (13)
Showing
with 245 additions and 782 deletions
......@@ -24,12 +24,32 @@
inner structs.
- `tlua::LuaRead`, `tlua::Push` & `tlua::PushInto` derive macros now support
new-type style tuple structs: they are treated as the inner type.
- `impl tlua::PushInto for Tuple`.
- `net_box::promise::TryGet::into_res` and `From<TryGet<_, _>> for Result<_, _>`.
- `impl [tlua::LuaRead|tlua::Push|tlua::PushOne] for tlua::Object`.
- `fiber::Mutex`'s methods `lock` & `try_lock` now will log the location of
last successful lock when built with `debug_assertions`.
- `#[track_caller]` added to tlua functions that can panic.
- A clarification in `tarantool::proc` documentation about the safety of using
borrowed arguments.
- `impl LuaRead for LuaState`: this is mainly useful for capturing the lua
context passed to rust-callbacks for example for use with `tlua::error!`.
See test `error` in `tests/src/tlua/functions_write.rs` for examples.
- Doc-comments here and there.
- `tests/test.sh` now supports `TARANTOOL_MODULE_BUILD_MODE` environment
variable to select which build mode is tested (debug, release, etc.)
- `fiber::r#yield` function for yielding fibers likewise tarantool LUA api.
### Removed
- `raft` cfg feature that wasn't finished an will never be.
### Fixed
- `TupleBuffer` no longer copies data into tarantool's transaction memory pool
in `TupleBuffer::from_vec_unchecked`, which previously would result in a use
after free in some cases.
- `impl<_> From<tlua::PushIterError<_>> for tlua::Void` is now more general
which allows more types to be used in contexts like `tlua::Lua::set`, etc.
- `tests/run_benchmarks.lua` now works again.
### Changed
......
......@@ -10,6 +10,4 @@ members = [
"examples/hardest",
"examples/read",
"examples/write",
"examples/cluster_node"
]
[package]
name = "cluster_node"
version = "0.1.0"
authors = [
"Dmitriy Koltsov <dkoltsov@picodata.io>",
"Anton Melnikov <volt0@picodata.io>"
]
edition = "2018"
license = "BSD-2-Clause"
rust-version = "1.59"
[dependencies]
once_cell = "1.4.0"
[dependencies.tarantool]
path = "../../tarantool"
features = ["raft_node"]
[lib]
test = false
crate-type = ["cdylib"]
#!/usr/bin/env tarantool
package.cpath = 'target/debug/?.so;' .. package.cpath
box.cfg{
listen = 3301,
}
box.once('bootstrap_libcluster_node', function()
box.schema.func.create('libcluster_node.run_node', {language = 'C'})
box.schema.func.create('libcluster_node.rpc', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcluster_node.rpc')
end)
box.func['libcluster_node.run_node']:call()
os.exit(0)
use std::cell::RefCell;
use std::os::raw::c_int;
use std::rc::{Rc, Weak};
use tarantool::{
proc,
raft::Node,
tuple::{FunctionArgs, FunctionCtx},
};
use once_cell::sync::Lazy;
#[derive(Default)]
struct Global {
node: RefCell<Weak<Node>>,
}
unsafe impl Sync for Global {}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl Send for Global {}
static GLOBAL: Lazy<Global> = Lazy::new(Default::default);
#[proc]
fn run_node(bootstrap_addrs: Vec<&str>) {
let node = Rc::new(Node::new("libcluster_node.rpc", bootstrap_addrs, Default::default()).unwrap());
GLOBAL.node.replace(Rc::downgrade(&node));
node.run().unwrap();
}
#[no_mangle]
pub extern "C" fn rpc(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
match GLOBAL.node.borrow().upgrade() {
None => 0,
Some(node) => node.handle_rpc(ctx, args),
}
}
......@@ -20,12 +20,10 @@ rust-version = "1.59"
base64 = "0.13"
bitflags = "1.2"
byteorder = "1.3"
chrono = { version = "0.4.19", optional = true }
derivative = "2.1"
dlopen = "0.1.8"
dec = "0.4.8"
thiserror = "1.0.30"
ipnetwork = { version = "0.18.0", optional = true }
libc = "0.2"
log = "0.4"
nix = "0.24.1"
......@@ -33,10 +31,7 @@ num-traits = "0.2"
num-derive = "0.3"
once_cell = "1.4.0"
tlua = { path = "../tlua", version = "0.6.1" }
protobuf = { version = "2", optional = true }
raft = { version = "0.6.0", optional = true }
refpool = { version = "0.4.3", optional = true }
rand = { version = "0.8.3", optional = true }
rmp = "0.8"
rmp-serde = "=1.0.0"
rmpv = { version = "^1.0", features = ["with-serde"] }
......@@ -53,8 +48,7 @@ va_list = "0.1.3"
[features]
default = ["net_box"]
net_box = ["refpool"]
raft_node = ["chrono", "ipnetwork", "net_box", "protobuf", "raft", "rand"]
schema = []
defer = []
picodata = []
all = ["default", "raft_node", "schema", "defer"]
all = ["default", "schema", "defer"]
......@@ -24,8 +24,8 @@ use ::va_list::{VaList, VaPrimitive};
#[cfg(all(target_arch = "aarch64", target_os = "macos"))]
use crate::va_list::{VaList, VaPrimitive};
use crate::c_ptr;
use crate::error::TarantoolError;
use crate::{c_ptr, set_error};
use crate::error::{TarantoolError, TarantoolErrorCode};
use crate::ffi::{tarantool as ffi, lua};
use crate::Result;
......@@ -37,6 +37,7 @@ pub use channel::{
pub mod mutex;
pub use mutex::Mutex;
use crate::ffi::tarantool::fiber_sleep;
macro_rules! impl_debug_stub {
($t:ident $($p:tt)*) => {
......@@ -1292,16 +1293,30 @@ pub fn clock64() -> u64 {
/// Return control to another fiber and wait until it'll be explicitly awoken by
/// another fiber.
///
/// Consider using [`fiber::sleep`]`(Duration::ZERO)` instead, that way the
/// Consider using [`fiber::sleep`]`(Duration::ZERO)` or [`fiber::yield`] instead, that way the
/// fiber will be automatically awoken and will resume execution shortly.
///
/// [`fiber::sleep`]: crate::fiber::sleep
/// [`fiber::start`]: crate::fiber::start
/// [`fiber::defer`]: crate::fiber::defer
/// [`fiber::yield`]: crate::fiber::yield
pub fn fiber_yield() {
unsafe { ffi::fiber_yield() }
}
/// Returns control to the scheduler.
/// Works likewise [`fiber::sleep`]`(Duration::ZERO)` but return error if fiber was canceled by another routine.
///
/// [`fiber::sleep`]: crate::fiber::sleep
pub fn r#yield() -> Result<()> {
unsafe { fiber_sleep(0f64) };
if is_cancelled() {
set_error!(TarantoolErrorCode::ProcLua, "fiber is cancelled");
return Err(TarantoolError::last().into())
}
Ok(())
}
/// Reschedule fiber to end of event loop cycle.
pub fn reschedule() {
unsafe { ffi::fiber_reschedule() }
......
......@@ -6,12 +6,17 @@ use std::{
use crate::fiber::{Latch, LatchGuard};
#[cfg(debug_assertions)]
use std::{cell::Cell, panic::Location};
////////////////////////////////////////////////////////////////////////////////
// Mutex
////////////////////////////////////////////////////////////////////////////////
pub struct Mutex<T: ?Sized> {
latch: Latch,
#[cfg(debug_assertions)]
lock_location: Cell<Option<&'static Location<'static>>>,
data: UnsafeCell<T>,
}
......@@ -31,6 +36,8 @@ impl<T: ?Sized> Mutex<T> {
{
Mutex {
latch: Latch::new(),
#[cfg(debug_assertions)]
lock_location: Cell::default(),
data: UnsafeCell::new(t),
}
}
......@@ -63,10 +70,18 @@ impl<T: ?Sized> Mutex<T> {
/// }).join();
/// assert_eq!(*mutex.lock(), 10);
/// ```
#[track_caller]
pub fn lock(&self) -> MutexGuard<'_, T> {
unsafe {
MutexGuard::new(self, self.latch.lock())
}
#[cfg(debug_assertions)]
let guard = self.latch.try_lock().unwrap_or_else(|| {
self.log_lock_location();
self.latch.lock()
});
#[cfg(not(debug_assertions))]
let guard = self.latch.lock();
unsafe { MutexGuard::new(self, guard) }
}
/// Attempts to acquire this lock.
......@@ -101,9 +116,15 @@ impl<T: ?Sized> Mutex<T> {
/// }).join();
/// assert_eq!(*mutex.lock(), 10);
/// ```
#[track_caller]
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
unsafe {
self.latch.try_lock().map(|guard| MutexGuard::new(self, guard))
match self.latch.try_lock() {
Some(guard) => unsafe { Some(MutexGuard::new(self, guard)) }
None => {
#[cfg(debug_assertions)]
self.log_lock_location();
None
}
}
}
......@@ -159,6 +180,20 @@ impl<T: ?Sized> Mutex<T> {
pub fn get_mut(&mut self) -> &mut T {
self.data.get_mut()
}
#[cfg(debug_assertions)]
#[inline]
fn log_lock_location(&self) {
use std::borrow::Cow;
use crate::log::{say, SayLevel};
let msg: Cow<str> = if let Some(loc) = self.lock_location.get() {
format!("mutex was locked at {loc}").into()
} else {
"mutex was locked at unknown location".into()
};
say(SayLevel::Verbose, std::file!(), std::line!() as _, None, &msg);
}
}
impl<T> From<T> for Mutex<T> {
......@@ -183,6 +218,21 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> {
Some(guard) => {
d.field("data", &&*guard);
}
#[cfg(debug_assertions)]
None => {
struct LockedPlaceholder(Option<&'static Location<'static>>);
impl fmt::Debug for LockedPlaceholder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(loc) = self.0 {
write!(f, "<locked at {loc}>")
} else {
f.write_str("<locked>")
}
}
}
d.field("data", &LockedPlaceholder(self.lock_location.get()));
}
#[cfg(not(debug_assertions))]
None => {
struct LockedPlaceholder;
impl fmt::Debug for LockedPlaceholder {
......@@ -207,11 +257,21 @@ pub struct MutexGuard<'a, T: ?Sized + 'a> {
}
impl<'mutex, T: ?Sized> MutexGuard<'mutex, T> {
#[track_caller]
unsafe fn new(lock: &'mutex Mutex<T>, _latch_guard: LatchGuard) -> Self {
#[cfg(debug_assertions)]
lock.lock_location.set(Some(Location::caller()));
Self { lock, _latch_guard }
}
}
impl<'a, T: ?Sized + 'a> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
#[cfg(debug_assertions)]
self.lock.lock_location.set(None);
}
}
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
type Target = T;
......
......@@ -29,7 +29,7 @@
//! ### Stored procedures
//!
//! There are several ways Tarantool can call Rust code. It can use either a plugin, a Lua FFI module,
//! or a stored procedure. In this file we only cover the third option, namely Rust stored procedures.
//! or a [stored procedure]. In this file we only cover the third option, namely Rust stored procedures.
//! Even though Tarantool always treats Rust routines just as "C functions", we keep on using the "stored procedure"
//! term as an agreed convention and also for historical reasons.
//!
......@@ -87,19 +87,14 @@
//!
//! Edit the `lib.rs` file and add the following lines:
//! ```rust
//! use std::os::raw::c_int;
//! use tarantool::tuple::{FunctionArgs, FunctionCtx};
//!
//! #[no_mangle]
//! pub extern "C" fn easy(_: FunctionCtx, _: FunctionArgs) -> c_int {
//! #[tarantool::proc]
//! fn easy() {
//! println!("hello world");
//! 0
//! }
//!
//! #[no_mangle]
//! pub extern "C" fn easy2(_: FunctionCtx, _: FunctionArgs) -> c_int {
//! #[tarantool::proc]
//! fn easy2() {
//! println!("hello world -- easy2");
//! 0
//! }
//! ```
//!
......@@ -155,6 +150,8 @@
//! ... and this time the result will be `hello world -- easy2`.
//!
//! As you can see, calling a Rust function is as straightforward as it can be.
//!
//! [stored procedure]: macro@crate::proc
pub mod clock;
pub mod coio;
pub mod decimal;
......@@ -167,7 +164,6 @@ pub mod net_box;
#[doc(hidden)]
pub mod msgpack;
pub mod proc;
pub mod raft;
pub mod schema;
pub mod sequence;
pub mod session;
......@@ -200,6 +196,51 @@ pub use tlua;
/// assert(box.func['libname.add']:call({ 1, 2 }) == 3)
/// ```
///
/// # Accepting borrowed arguments
///
/// It can sometimes be more efficient to borrow the procedure's arguments
/// rather than copying them. This usecase is supported, however it is not
/// entirely safe. Due to how stored procedures are implemented in tarantool,
/// the arguments are allocated in a volatile region of memory, which can be
/// overwritten by some tarantool operations. Therefore you cannot rely on the
/// borrowed arguments being valid for the lifetime of the procedure call.
///
/// This proc is safe, because the data is accessed before any other calls to
/// tarantool api:
/// ```no_run
/// #[tarantool::proc]
/// fn strlen(s: &str) -> usize {
/// s.len()
/// }
/// ```
///
/// This one however is unsafe:
/// ```no_run
/// use tarantool::{error::Error, index::IteratorType::Eq, space::Space};
/// use std::collections::HashSet;
///
/// #[tarantool::proc]
/// fn count_common_friends(user1: &str, user2: String) -> Result<usize, Error> {
/// // A call to tarantool api.
/// let space = Space::find("friends_with").unwrap();
///
/// // This call is unsafe, because borrowed data `user1` is accessed
/// // after a call to tarantool api.
/// let iter = space.select(Eq, &[user1])?;
/// let user1_friends: HashSet<String> = iter
/// .map(|tuple| tuple.get(1).unwrap())
/// .collect();
///
/// // This call is safe, because `user2` is owned.
/// let iter = space.select(Eq, &[user2])?;
/// let user2_friends: HashSet<String> = iter
/// .map(|tuple| tuple.get(1).unwrap())
/// .collect();
///
/// Ok(user1_friends.intersection(&user2_friends).count())
/// }
/// ```
///
/// # Returning errors
///
/// Assuming the function's return type is [`Result`]`<T, E>` (where `E` implements
......
......@@ -194,8 +194,11 @@ pub enum State {
/// or [`Promise::wait_timeout`] methods.
#[derive(Debug)]
pub enum TryGet<T, E> {
/// Promise was kept successfully.
Ok(T),
/// Promise will never be kept due to an error.
Err(E),
/// Promise yet is unresolved.
Pending(Promise<T>),
}
......@@ -220,6 +223,26 @@ impl<T, E> TryGet<T, E> {
_ => None,
}
}
/// Converts `self` into a nested [`Result`].
///
/// Returns
/// - `Ok(Ok(value))` in case of [`TryGet::Ok`]`(value)`.
/// - `Ok(Err(error))` in case of [`TryGet::Err`]`(error)`.
/// - `Err(promise)` in case of [`TryGet::Pending`]`(promise)`.
///
/// This function basically checks if the promise is resolved (`Ok`) or not
/// yet (`Err`).
///
/// [`Result`]: std::result::Result
#[inline(always)]
pub fn into_res(self) -> StdResult<StdResult<T, E>, Promise<T>> {
match self {
Self::Ok(v) => Ok(Ok(v)),
Self::Err(e) => Ok(Err(e)),
Self::Pending(p) => Err(p),
}
}
}
impl<T, E> From<StdResult<T, E>> for TryGet<T, E> {
......@@ -231,6 +254,13 @@ impl<T, E> From<StdResult<T, E>> for TryGet<T, E> {
}
}
impl<T, E> From<TryGet<T, E>> for StdResult<StdResult<T, E>, Promise<T>> {
#[inline(always)]
fn from(r: TryGet<T, E>) -> Self {
r.into_res()
}
}
use std::fmt;
impl<T> fmt::Debug for Promise<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
......
use raft::prelude::Entry;
use serde::Serialize;
use crate::index::IndexOptions;
use crate::space::SpaceCreateOptions;
#[derive(Serialize)]
pub enum Command {
CreateSpace(String, SpaceCreateOptions),
CreateIndex(String, String, IndexOptions),
DropSpace(String),
DropIndex(String, String),
}
pub struct Fsm {}
impl Fsm {
pub fn new() -> Self {
Fsm {}
}
pub fn handle_normal(&mut self, entry: Entry) {
unimplemented!();
}
pub fn handle_conf_change(&mut self, entry: Entry) {
unimplemented!();
}
}
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::net::SocketAddr;
use crate::error::Error;
use crate::raft::net::ConnectionId;
use crate::raft::rpc;
pub struct NodeInner {
state: NodeState,
local_id: u64,
peers: BTreeMap<u64, Vec<SocketAddr>>,
responded_ids: HashSet<u64>,
bootstrap_addrs: Vec<Vec<SocketAddr>>,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum NodeState {
Init,
ColdBootstrap,
WarmBootstrap,
Ready,
Offline,
Done,
}
#[derive(Debug)]
pub enum NodeEvent {
Request(rpc::BootstrapMsg),
Response(rpc::BootstrapMsg),
Timeout,
Stop,
}
#[derive(Debug)]
pub enum NodeAction {
Connect(ConnectionId, Vec<SocketAddr>),
UpgradeSeed(ConnectionId, u64),
Request(ConnectionId, rpc::BootstrapMsg),
Response(Result<rpc::Response, Error>),
StateChangeNotification(NodeState),
}
impl NodeInner {
pub fn new(
local_id: u64,
local_addrs: Vec<SocketAddr>,
bootstrap_addrs: Vec<Vec<SocketAddr>>,
) -> Self {
let mut peers = BTreeMap::new();
peers.insert(local_id, local_addrs);
NodeInner {
state: NodeState::Init,
local_id,
peers,
responded_ids: Default::default(),
bootstrap_addrs,
}
}
pub fn update(&mut self, events: &mut VecDeque<NodeEvent>, actions: &mut VecDeque<NodeAction>) {
if let NodeState::Init = self.state {
self.init(actions);
}
while let Some(event) = events.pop_front() {
self.handle_event(event, actions);
}
}
pub fn state(&self) -> &NodeState {
&self.state
}
fn init(&mut self, actions_buf: &mut VecDeque<NodeAction>) {
for (id, seed_addrs) in self.bootstrap_addrs.clone().into_iter().enumerate() {
let id = ConnectionId::Seed(id);
actions_buf.push_back(NodeAction::Connect(id.clone(), seed_addrs));
self.send_bootstrap_request(id, actions_buf);
}
self.state = NodeState::ColdBootstrap;
}
fn handle_event(&mut self, event: NodeEvent, actions_buf: &mut VecDeque<NodeAction>) {
use NodeEvent as E;
use NodeState as S;
let new_state = match (self.state, event) {
(S::ColdBootstrap, E::Request(req))
| (S::ColdBootstrap, E::Response(req))
| (S::Offline, E::Request(req)) => {
self.handle_msg(req, actions_buf);
Some(S::WarmBootstrap)
}
(S::WarmBootstrap, E::Request(req)) | (S::WarmBootstrap, E::Response(req)) => {
self.handle_msg(req, actions_buf);
let num_peers = self.peers.len();
let num_responded = self.responded_ids.len();
if num_peers == (num_responded + 1) {
Some(S::Ready)
} else {
None
}
}
(S::ColdBootstrap, E::Timeout) => Some(S::Offline),
(S::Offline, E::Timeout) => None,
(_, E::Stop) => Some(S::Done),
_ => panic!("invalid state"),
};
if let Some(new_state) = new_state {
if self.state != new_state {
actions_buf.push_back(NodeAction::StateChangeNotification(new_state));
self.state = new_state;
}
}
}
fn handle_msg(&mut self, req: rpc::BootstrapMsg, actions_buf: &mut VecDeque<NodeAction>) {
if req.from_id == self.local_id {
return;
}
if !self.responded_ids.contains(&req.from_id) {
let new_nodes = self.merge_nodes_list(&req.nodes);
for (id, addrs) in new_nodes {
let id = ConnectionId::Peer(id);
actions_buf.push_back(NodeAction::Connect(id.clone(), addrs));
self.send_bootstrap_request(id, actions_buf);
}
self.responded_ids.insert(req.from_id);
}
}
#[inline]
fn send_bootstrap_request(&mut self, to: ConnectionId, actions_buf: &mut VecDeque<NodeAction>) {
let nodes = self
.peers
.iter()
.map(|(id, addrs)| (*id, addrs.clone()))
.collect();
actions_buf.push_back(NodeAction::Request(
to,
rpc::BootstrapMsg {
from_id: self.local_id,
nodes,
},
));
}
/// Merges `other` nodes list to already known. Returns new nodes count
fn merge_nodes_list(
&mut self,
nodes_from: &[(u64, Vec<SocketAddr>)],
) -> Vec<(u64, Vec<SocketAddr>)> {
let mut new_nodes = Vec::<(u64, Vec<SocketAddr>)>::with_capacity(nodes_from.len());
{
for (id, addrs) in nodes_from {
if !self.peers.contains_key(id) {
self.peers.insert(*id, addrs.clone());
new_nodes.push((*id, addrs.clone()));
}
}
}
new_nodes
}
}
#![cfg(feature = "raft_node")]
#![allow(dead_code, unused_variables)]
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io;
use std::net::ToSocketAddrs;
use std::time::Duration;
use rand::random;
use inner::{NodeAction, NodeInner, NodeState};
use net::{get_local_addrs, ConnectionPool};
use crate::error::{Error, TarantoolErrorCode};
use crate::fiber::Cond;
use crate::net_box::{Conn, ConnOptions, Options};
use crate::raft::inner::NodeEvent;
use crate::set_error;
use crate::tuple::{FunctionArgs, FunctionCtx};
mod fsm;
pub mod inner;
pub mod net;
pub mod rpc;
mod storage;
pub struct Node {
inner: RefCell<NodeInner>,
connections: RefCell<ConnectionPool>,
rpc_function: String,
events_cond: Cond,
events_buffer: RefCell<VecDeque<NodeEvent>>,
actions_buffer: RefCell<VecDeque<NodeAction>>,
ready_cond: Cond,
options: NodeOptions,
}
pub struct NodeOptions {
bootstrap_poll_interval: Duration,
tick_interval: Duration,
recv_queue_size: usize,
send_queue_size: usize,
connection_options: ConnOptions,
rpc_call_options: Options,
}
impl Default for NodeOptions {
fn default() -> Self {
NodeOptions {
bootstrap_poll_interval: Duration::from_secs(1),
tick_interval: Duration::from_millis(100),
recv_queue_size: 127,
send_queue_size: 127,
connection_options: Default::default(),
rpc_call_options: Default::default(),
}
}
}
impl Node {
pub fn new(
rpc_function: &str,
bootstrap_addrs: Vec<impl ToSocketAddrs>,
options: NodeOptions,
) -> Result<Self, Error> {
let id = random::<u64>();
let local_addrs = get_local_addrs()?;
let mut bootstrap_addrs_cfg = Vec::with_capacity(bootstrap_addrs.len());
for addrs in bootstrap_addrs {
bootstrap_addrs_cfg.push(addrs.to_socket_addrs()?.collect())
}
Ok(Node {
inner: RefCell::new(NodeInner::new(id, local_addrs, bootstrap_addrs_cfg)),
connections: RefCell::new(ConnectionPool::new(options.connection_options.clone())),
rpc_function: rpc_function.to_string(),
events_cond: Cond::new(),
events_buffer: RefCell::new(VecDeque::with_capacity(options.recv_queue_size)),
actions_buffer: RefCell::new(VecDeque::with_capacity(options.send_queue_size)),
ready_cond: Cond::new(),
options,
})
}
pub fn run(&self) -> Result<(), Error> {
loop {
{
let mut actions = self.actions_buffer.borrow_mut();
let mut events = self.events_buffer.borrow_mut();
self.inner.borrow_mut().update(&mut events, &mut actions);
}
for action in self.actions_buffer.borrow_mut().drain(..) {
match action {
NodeAction::Connect(id, addrs) => {
self.connections.borrow_mut().connect(id, &addrs[..])?;
}
NodeAction::Request(to, msg) => {
let mut conn_pool = self.connections.borrow_mut();
self.send(conn_pool.get(&to).unwrap(), rpc::Request::Bootstrap(msg))?;
}
NodeAction::Response(_) => {}
NodeAction::StateChangeNotification(state) => match state {
NodeState::Ready => {
self.ready_cond.signal();
}
NodeState::Done => return Ok(()),
_ => {}
},
_ => {}
};
}
self.events_cond.wait();
}
}
pub fn handle_rpc(&self, ctx: FunctionCtx, args: FunctionArgs) -> i32 {
match args.decode::<rpc::Request>() {
Err(e) => set_error!(TarantoolErrorCode::Protocol, "{}", e),
Ok(request) => {
match request {
rpc::Request::Bootstrap(msg) => {
self.events_buffer
.borrow_mut()
.push_back(NodeEvent::Request(msg));
self.events_cond.signal();
}
_ => unimplemented!(),
};
ctx.return_mp(&rpc::Response::Ack)
.unwrap_or_else(|e| set_error!(TarantoolErrorCode::ProcC, "{}", e))
}
}
}
pub fn wait_ready(&self, timeout: Duration) -> Result<(), Error> {
if self.inner.borrow().state() != &NodeState::Ready
&& !self.ready_cond.wait_timeout(timeout) {
return Err(Error::IO(io::ErrorKind::TimedOut.into()));
}
Ok(())
}
pub fn close(&self) {
self.events_buffer.borrow_mut().push_back(NodeEvent::Stop);
self.events_cond.signal();
}
fn send(&self, conn: &Conn, request: rpc::Request) -> Result<Option<rpc::Response>, Error> {
let result = conn.call(
self.rpc_function.as_str(),
&request,
&self.options.rpc_call_options,
);
match result {
Err(Error::IO(_)) => Ok(None),
Err(e) => Err(e),
Ok(response) => match response {
None => Ok(None),
Some(response) => {
let ((resp,),) = response.decode::<((rpc::Response,),)>()?;
Ok(Some(resp))
}
},
}
}
}
use std::borrow::Cow;
use std::collections::HashMap;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs};
use std::ptr::null_mut;
use ipnetwork::{Ipv4Network, Ipv6Network};
use crate::error::Error;
use crate::net_box::{Conn, ConnOptions};
#[derive(Default)]
pub struct ConnectionPool {
options: ConnOptions,
inner: HashMap<ConnectionId, Conn>,
}
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub enum ConnectionId {
Seed(usize),
Peer(u64),
}
impl ConnectionPool {
pub fn new(options: ConnOptions) -> Self {
ConnectionPool {
options,
inner: HashMap::new(),
}
}
pub fn connect(&mut self, id: ConnectionId, addrs: impl ToSocketAddrs) -> Result<(), Error> {
let conn = Conn::new(addrs, self.options.clone(), None)?;
self.inner.insert(id, conn);
Ok(())
}
pub fn get(&mut self, id: &ConnectionId) -> Option<&mut Conn> {
self.inner.get_mut(id)
}
}
pub fn get_local_addrs() -> Result<Vec<SocketAddr>, Error> {
let listen_addr_config = unsafe { get_listen_addr_config() };
let listen_addrs = match listen_addr_config.parse::<u16>() {
Ok(port) => vec![SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
port,
))],
_ => listen_addr_config.to_socket_addrs()?.collect(),
};
let mut if_addrs = null_mut::<libc::ifaddrs>();
let res = unsafe { libc::getifaddrs(&mut if_addrs as *mut _) };
if res < 0 {
return Err(io::Error::last_os_error().into());
}
let mut result = Vec::<SocketAddr>::new();
let mut current_if_addr = if_addrs;
while !current_if_addr.is_null() {
unsafe {
let ifa_addr = (*current_if_addr).ifa_addr;
let netmask = (*current_if_addr).ifa_netmask;
current_if_addr = (*current_if_addr).ifa_next;
if !(ifa_addr.is_null() || netmask.is_null()) {
let addr_family = (*ifa_addr).sa_family as i32;
let network = match addr_family {
libc::AF_INET => {
// is a valid IP4 Address
let addr = (*(ifa_addr as *const _ as *const SocketAddrV4)).ip();
let netmask = (*(netmask as *const _ as *const SocketAddrV4)).ip();
ipnetwork::IpNetwork::V4(
Ipv4Network::with_netmask(*addr, *netmask).unwrap(),
)
}
libc::AF_INET6 => {
// is a valid IP6 Address
let addr = (*(ifa_addr as *const _ as *const SocketAddrV6)).ip();
let netmask = (*(netmask as *const _ as *const SocketAddrV6)).ip();
ipnetwork::IpNetwork::V6(
Ipv6Network::with_netmask(*addr, *netmask).unwrap(),
)
}
_ => continue,
};
for listen_addr in listen_addrs.iter() {
let is_matches = match listen_addr.ip() {
IpAddr::V4(ip) if ip.is_unspecified() => true,
listen_addr => network.contains(listen_addr),
};
if is_matches {
result.push(SocketAddr::new(network.ip(), listen_addr.port()));
}
}
}
}
}
unsafe {
libc::freeifaddrs(if_addrs);
}
Ok(result)
}
unsafe fn get_listen_addr_config<'a>() -> Cow<'a, str> {
use crate::ffi::lua::{
lua_getfield, lua_getglobal, lua_gettop, lua_settop, lua_tostring,
};
use crate::ffi::tarantool::luaT_state;
use std::ffi::{CStr, CString};
let l = luaT_state();
let top_idx = lua_gettop(l);
let s = CString::new("box").unwrap();
lua_getglobal(l, s.as_ptr());
let s = CString::new("cfg").unwrap();
lua_getfield(l, -1, s.as_ptr());
let s = CString::new("listen").unwrap();
lua_getfield(l, -1, s.as_ptr());
let listen_addr_str = CStr::from_ptr(lua_tostring(l, -1)).to_string_lossy();
lua_settop(l, top_idx);
listen_addr_str
}
use std::ffi::{c_void, CStr};
use std::net::SocketAddr;
use std::path::Path;
use serde::{Serialize, Deserialize};
use crate::error::Error;
use crate::session;
use crate::space::{FuncMetadata, Privilege, Space, SystemSpace};
use crate::tuple::Encode;
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Request {
#[serde(rename = "bootstrap")]
Bootstrap(BootstrapMsg),
#[serde(rename = "propose")]
Propose,
#[serde(rename = "raft")]
Raft { data: Vec<u8> },
}
impl Encode for Request {}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Response {
#[serde(rename = "bootstrap")]
Bootstrap(BootstrapMsg),
#[serde(rename = "ack")]
Ack,
}
impl Encode for Response {}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct BootstrapMsg {
pub from_id: u64,
pub nodes: Vec<(u64, Vec<SocketAddr>)>,
}
#[allow(unused)]
pub fn init_stored_proc(function_name: &str) -> Result<(), Error> {
// get library metadata
let mut module_info = libc::Dl_info {
dli_fname: std::ptr::null(),
dli_sname: std::ptr::null(),
dli_fbase: std::ptr::null_mut(),
dli_saddr: std::ptr::null_mut(),
};
unsafe { libc::dladdr(init_stored_proc as *const c_void, &mut module_info) };
// extract name from library metadata
let module_abs_path = unsafe { CStr::from_ptr(module_info.dli_fname) }
.to_str()
.unwrap();
let module_name = Path::new(module_abs_path)
.file_stem()
.unwrap()
.to_string_lossy()
.to_string();
let stored_proc_name = [module_name.as_str(), function_name].join(".");
let mut func_sys_space: Space = SystemSpace::Func.into();
let idx = func_sys_space
.index("name")
.expect("System space \"_func\" not found");
if idx.get(&(stored_proc_name.as_str(),))?.is_none() {
// resolve new func id: get max id + increment
let id = match func_sys_space.primary_key().max(&())? {
None => 1,
Some(t) => t.decode::<(u32,)>()?.0 + 1, // decode: Result -> Tuple[u32] -> (u32,) -> u32
};
// create new func record
let datetime = chrono::Local::now().format("%Y-%m-%d %H:%M:%S");
let owner = session::uid()? as u32;
func_sys_space.insert(&FuncMetadata {
id,
owner,
name: stored_proc_name,
setuid: 0,
language: "C".to_string(),
body: "".to_string(),
routine_type: "function".to_string(),
param_list: vec![],
returns: "any".to_string(),
aggregate: "none".to_string(),
sql_data_access: "none".to_string(),
is_deterministic: false,
is_sandboxed: false,
is_null_call: true,
exports: vec!["LUA".to_string()],
opts: Default::default(),
comment: "".to_string(),
created: datetime.to_string(),
last_altered: datetime.to_string(),
})?;
// grant guest to execute new func
let mut priv_sys_space: Space = SystemSpace::Priv.into();
priv_sys_space.insert(&Privilege {
grantor: 1,
grantee: 0,
object_type: "function".to_string(),
object_id: id,
privilege: 4,
})?;
}
Ok(())
}
use raft::prelude::{Entry, HardState, Snapshot};
use raft::storage::Storage;
use raft::{Error as RaftError, RaftState};
use crate::error::Error;
pub struct NodeStorage {}
impl NodeStorage {
pub fn new() -> Result<Self, Error> {
Ok(NodeStorage {})
}
pub fn apply_snapshot(&mut self, snapshot: Snapshot) -> Result<(), Error> {
unimplemented!()
}
pub fn append(&mut self, entries: &[Entry]) -> Result<(), Error> {
unimplemented!()
}
pub fn set_hard_state(&mut self, hs: HardState) -> Result<(), Error> {
unimplemented!()
}
pub fn set_last_apply_index(&mut self, index: u64) -> Result<(), Error> {
unimplemented!()
}
}
impl Storage for NodeStorage {
fn initial_state(&self) -> Result<RaftState, RaftError> {
todo!()
}
fn entries(
&self,
low: u64,
high: u64,
max_size: impl Into<Option<u64>>,
) -> Result<Vec<Entry>, RaftError> {
todo!()
}
fn term(&self, idx: u64) -> Result<u64, RaftError> {
todo!()
}
fn first_index(&self) -> Result<u64, RaftError> {
todo!()
}
fn last_index(&self) -> Result<u64, RaftError> {
todo!()
}
fn snapshot(&self, request_index: u64) -> Result<Snapshot, RaftError> {
todo!()
}
}
......@@ -1133,6 +1133,26 @@ where
{
}
impl<L> tlua::PushInto<L> for Tuple
where
L: tlua::AsLua,
{
type Err = tlua::Void;
fn push_into_lua(self, lua: L) -> tlua::PushResult<L, Self> {
unsafe {
ffi::luaT_pushtuple(tlua::AsLua::as_lua(&lua), self.ptr.as_ptr());
Ok(tlua::PushGuard::new(lua, 1))
}
}
}
impl<L> tlua::PushOneInto<L> for Tuple
where
L: tlua::AsLua,
{
}
impl<L> tlua::LuaRead<L> for Tuple
where
L: tlua::AsLua,
......
......@@ -28,6 +28,7 @@ end
local port = free_port()
box.cfg{
log_level = 'verbose',
listen = port,
wal_mode = 'none',
memtx_dir = tmpdir,
......@@ -71,11 +72,19 @@ function target_dir()
return _target_dir
end
function build_mode()
local build_mode_env = os.getenv('TARANTOOL_MODULE_BUILD_MODE')
if not build_mode_env then
build_mode_env = 'debug'
end
return build_mode_env
end
-- Add test runner library location to lua search path
package.cpath = string.format(
'%s/debug/?.so;%s/debug/?.dylib;%s',
target_dir(),
target_dir(),
'%s/%s/?.so;%s/%s/?.dylib;%s',
target_dir(), build_mode(),
target_dir(), build_mode(),
package.cpath
)
......
......@@ -9,6 +9,7 @@ use crate::common::{
DropCounter, capture_value, fiber_csw, LuaStackIntegrityGuard, LuaContextSpoiler,
};
use tarantool::fiber;
use tarantool::fiber::Fiber;
use tarantool::tlua::AsLua;
use tarantool::util::IntoClones;
......@@ -469,3 +470,27 @@ pub fn lifetime() {
// }.join();
}
pub fn r#yield() {
//if fiber doesnt yield than test will be running forever
let mut fiber = Fiber::new("test_fiber", &mut |_| {
loop {
// ignore fiber is canceled error
fiber::r#yield().unwrap_or(());
}
});
fiber.set_joinable(true);
fiber.start(());
fiber.cancel();
}
pub fn yield_canceled() {
let mut fiber = Fiber::new("test_fiber", &mut |_| {
fiber::sleep(Duration::from_millis(10));
assert!(fiber::r#yield().is_err());
0
});
fiber.set_joinable(true);
fiber.start(());
fiber.cancel();
fiber::sleep(Duration::from_millis(20));
}
\ No newline at end of file
......@@ -31,7 +31,8 @@ pub fn debug() {
let m = Mutex::new(0);
let mut guard = m.lock();
let s = start(|| format!("{:?}", m)).join();
assert_eq!(s, "Mutex { data: <locked>, .. }");
assert_eq!(&s[..21], "Mutex { data: <locked");
assert_eq!(&s[s.len()-7..], ">, .. }");
*guard = 13;
Mutex::unlock(guard);
let s = start(|| format!("{:?}", m)).join();
......