-
Georgy Moshkin authoredGeorgy Moshkin authored
luamod.rs 54.79 KiB
//! Lua API exported as `_G.pico`
//!
use crate::cas;
use crate::config::PicodataConfig;
use crate::instance::InstanceId;
use crate::plugin::PluginIdentifier;
use crate::plugin::TopologyUpdateOpKind;
use crate::schema::{self, ADMIN_ID};
use crate::traft::op::{self, Op};
use crate::traft::{self, node, RaftIndex, RaftTerm};
use crate::util::duration_from_secs_f64_clamped;
use crate::util::str_eq;
use crate::util::INFINITY;
use crate::{plugin, rpc, sync, tlog};
use ::tarantool::fiber;
use ::tarantool::msgpack::ViaMsgpack;
use ::tarantool::session;
use ::tarantool::tlua;
use ::tarantool::tlua::{LuaState, LuaThread, PushOneInto, Void};
use ::tarantool::transaction::transaction;
use ::tarantool::tuple::Decode;
use ::tarantool::vclock::Vclock;
use indoc::indoc;
use std::time::Duration;
#[inline(always)]
fn luamod_set<V>(l: &LuaThread, name: &str, help: &str, value: V)
where
V: PushOneInto<LuaState>,
V::Err: Into<Void>,
{
let luamod: tlua::LuaTable<_> = l.get("pico").unwrap();
luamod.set(name, value);
let help_table: tlua::LuaTable<_> = luamod.get_or_create_metatable();
help_table.set(name, help);
}
#[inline(always)]
fn luamod_set_help_only(l: &LuaThread, name: &str, help: &str) {
let luamod: tlua::LuaTable<_> = l.get("pico").unwrap();
let help_table: tlua::LuaTable<_> = luamod.get_or_create_metatable();
help_table.set(name, help);
}
pub(crate) fn setup() {
let l = ::tarantool::lua_state();
l.exec(include_str!("luamod.lua")).unwrap();
luamod_set(
&l,
"PICODATA_VERSION",
indoc! {"
pico.PICODATA_VERSION
=====================
A string variable (not a function) containing Picodata version
which follows the Calendar Versioning convention with the
`YY.0M.MICRO` scheme:
https://calver.org/#scheme
Example:
picodata> pico.PICODATA_VERSION
---
- 24.6.0
...
"},
{
const _: () = assert!(str_eq(env!("CARGO_PKG_VERSION"), "24.6.0"));
crate::info::PICODATA_VERSION
},
);
luamod_set(
&l,
"LUA_API_VERSION",
indoc! {"
pico.LUA_API_VERSION
====================
A string variable (not a function) contatining version of the Lua API
which follows the Semantic Versioning convention:
https://semver.org/
The functions marked as Internal API only affect the PATCH version.
Example:
picodata> pico.LUA_API_VERSION
---
- 5.0.0
...
"},
"5.0.0",
);
luamod_set(
&l,
"config",
indoc! {"
pico.config()
=========
Returns a Lua table containing picodata configuration which is
gathered from the configuration file, environment variables and/or
command line arguments.
The content of the table is not strictly defined and may depend on circumstances.
Returns:
(table)
Example:
picodata> pico.config()
---
- cluster:
cluster_id: demo
- instance:
log_level: info
listen: localhost:3301
data_dir: .
peers:
- localhost:3301
...
"},
tlua::Function::new(move || -> ViaMsgpack<rmpv::Value> {
ViaMsgpack(PicodataConfig::get().parameters_with_sources_as_rmpv())
}),
);
luamod_set(
&l,
"vshard_config",
indoc! {"
pico.vshard_config()
=========
Returns a Lua table containing current vshard configuration.
Returns:
(table)
"},
tlua::Function::new(move || -> traft::Result<_> {
let node = node::global()?;
let config = crate::vshard::VshardConfig::from_storage(&node.storage)?;
Ok(config)
}),
);
luamod_set(
&l,
"whoami",
indoc! {"
pico.whoami()
=============
Returns the identifiers of the current instance.
Returns:
(table)
Fields:
- raft_id (number)
- cluster_id (string)
- instance_id (string)
- tier (string)
Example:
picodata> pico.whoami()
---
- raft_id: 1
cluster_id: demo
instance_id: i1
tier: storage
...
"},
tlua::function0(|| -> traft::Result<_> {
let node = traft::node::global()?;
let info = crate::info::InstanceInfo::try_get(node, None)?;
Ok(tlua::AsTable((
("raft_id", info.raft_id),
("cluster_id", info.cluster_id),
("instance_id", info.instance_id),
("tier", info.tier),
)))
}),
);
luamod_set(
&l,
"instance_info",
indoc! {"
pico.instance_info([instance_id])
=================================
Provides general information for the given instance.
Params:
1. instance_id (optional string), default: id of the current instance
Returns:
(table)
or
(nil, string) in case of an error
Fields:
- raft_id (number)
- advertise_address (string)
- instance_id (string)
- instance_uuid (string)
- replicaset_id (string)
- replicaset_uuid (string)
- current_state (table),
`{variant = string, incarnation = number}`, where variant is one of
'Offline' | 'Online' | 'Expelled'
- target_state (table),
`{variant = string, incarnation = number}`, where variant is one of
'Offline' | 'Replicated' | 'ShardingInitialized' | 'Online' | 'Expelled'
- tier (string)
Example:
picodata> pico.instance_info()
---
- raft_id: 1
advertise_address: localhost:3301
instance_id: i1
instance_uuid: 68d4a766-4144-3248-aeb4-e212356716e4
tier: storage
replicaset_id: r1
replicaset_uuid: e0df68c5-e7f9-395f-86b3-30ad9e1b7b07
current_state:
variant: Online
incarnation: 26
target_state:
variant: Online
incarnation: 26
...
"},
tlua::function1(|iid: Option<InstanceId>| -> traft::Result<_> {
let node = traft::node::global()?;
let info = crate::info::InstanceInfo::try_get(node, iid.as_ref())?;
Ok(tlua::AsTable((
("raft_id", info.raft_id),
("advertise_address", info.advertise_address),
("instance_id", info.instance_id.0),
("instance_uuid", info.instance_uuid),
("replicaset_id", info.replicaset_id),
("replicaset_uuid", info.replicaset_uuid),
("current_state", info.current_state),
("target_state", info.target_state),
("tier", info.tier),
)))
}),
);
///////////////////////////////////////////////////////////////////////////
luamod_set(
&l,
"raft_status",
indoc! {"
pico.raft_status()
==================
Returns the raft node status of the current instance.
Returns:
(table)
or
(nil, string) in case of an error, if the raft node is
not initialized yet
Fields:
- id (number)
- term (number)
- leader_id (number | nil),
absent if there's no leader in the current term
or it's unknown yet
- raft_state (string),
one of 'Follower' | 'Candidate' | 'Leader' | 'PreCandidate'
Example:
picodata> pico.raft_status()
---
- term: 27
leader_id: 1
raft_state: Leader
id: 1
...
"},
tlua::function0(|| traft::node::global().map(|n| n.status())),
);
luamod_set(
&l,
"raft_tick",
indoc! {"
pico.raft_tick(n_times)
=======================
Internal API. Makes the raft node to 'tick' `n_times`. It shouldn't
be used except for tests and dirty hacks. See `src/luamod.rs` for
the details.
Params:
1. n_times (number)
Returns:
(true)
or
(nil, string) in case of an error
"},
tlua::function1(|n_times: u32| -> traft::Result<bool> {
traft::node::global()?.tick_and_yield(n_times);
Ok(true)
}),
);
// sql
///////////////////////////////////////////////////////////////////////////
luamod_set_help_only(
&l,
"sql",
indoc! {r#"
pico.sql(query, [params], [options])
=========================
Executes a cluster-wide SQL query.
1. The query is parsed and validated to build a distributed
query plan on the current instance (router).
2. The query plan is dispatched to the target instances (storages)
slice-by-slice in a bottom-up manner. All intermediate results
are stored in the router's memory.
Params:
1. query (string)
2. params (table), optional
3. options (table), optional
`{traceable = boolean, query_id = string}`
Returns:
(table DqlResult) if query retrieves data
or
(table DmlResult) if query modifies data
or
(nil, string) in case of an error
table DqlResult:
- metadata (table),
`{{name = string, type = string}, ...}`, an array of column
definitions of the table.
- rows (table),
`{row, ...}`, essentially the result of the query.
table DmlResult:
- row_count (number), the number of rows modified, inserted or
deleted by the query.
Example:
picodata> -- Create a sharded table 'wonderland'.
picodata> pico.sql([[
create table "wonderland" (
"property" text not null,
"value" integer,
primary key ("property")
) using memtx distributed by ("property")
option (timeout = 3.0)
]])
---
- row_count: 1
...
picodata> -- Drop 'wonderland' table.
picodata> pico.sql([[
drop table "wonderland"
option (timeout = 3.0)
]])
---
- row_count: 1
...
picodata> -- Insert a row into the 'wonderland' table using parameters
picodata> -- as an optional argument.
picodata> pico.sql(
[[insert into "wonderland" ("property", "value") values (?, ?)]],
{"dragon", 13}
)
---
- row_count: 1
...
picodata> -- Select a row from the 'wonderland' table.
picodata> pico.sql(
[[select * from "wonderland" where "property" = 'dragon']]
)
---
- metadata:
- {'name': 'property', 'type': 'string'}
- {'name': 'value', 'type': 'integer'}
rows:
- ['dragon', 13]
"#},
);
// vclock
///////////////////////////////////////////////////////////////////////////
luamod_set_help_only(
&l,
"table Vclock",
indoc! {"
table Vclock
============
Vclock is a mapping of replica id (number) to its LSN (number).
The meaning of these concepts is explained below.
To ensure data persistence, Tarantool records updates to the
database in the so-called write-ahead log (WAL) files. Each record
in the WAL represents a single Tarantool data-change request such as
`INSERT`, `UPDATE`, or `DELETE`, and is assigned a monotonically
growing log sequence number (LSN).
Enabling replication makes all replicas in a replica set to exchange
their records, each with it's own LSN. Together, LSNs from different
replicas form a vector clock — vclock. Vclock defines the database
state of an instance.
The zero vclock component is special, it's used for tracking local
changes that aren't replicated.
Example:
{[0] = 2, [1] = 101}
{[0] = 148, [1] = 9086, [3] = 2}
"},
);
luamod_set(
&l,
"get_vclock",
indoc! {"
pico.get_vclock()
=================
Returns the current vclock from Tarantool `box.info.vclock` API.
Returns:
(table Vclock), see pico.help('table Vclock')
Example:
picodata> pico.get_vclock()
---
- 0: 2
1: 101
...
"},
tlua::function0(Vclock::current),
);
luamod_set(
&l,
"wait_vclock",
indoc! {"
pico.wait_vclock(target, timeout)
=================================
Waits until Tarantool vclock reaches the `target` value.
Returns the actual vclock. It can be equal to or greater than the
target one. If timeout expires beforehand, the function returns an
error.
Params:
1. target (table Vclock), see pico.help('table Vclock')
2. timeout (number), in seconds
Returns:
(table Vclock)
or
(nil, string) in case of an error
"},
tlua::function2(|target: Vclock, timeout: f64| -> traft::Result<Vclock> {
sync::wait_vclock(target, duration_from_secs_f64_clamped(timeout))
}),
);
// propose
///////////////////////////////////////////////////////////////////////////
luamod_set(
&l,
"raft_propose_nop",
indoc! {"
pico.raft_propose_nop()
=======================
Internal API. Proposes and waits for Op::Nop to be applied.
It shouldn't be used except for tests.
Returns:
(number) raft index of the nop entry
"},
tlua::function0(|| traft::node::global()?.propose_nop()),
);
luamod_set(
&l,
"raft_propose",
indoc! {"
pico.raft_propose(op)
============================
Internal API, see src/luamod.rs for the details.
Params:
1. op (table)
Returns:
(number)
or
(nil, string) in case of an error
"},
tlua::function1(|lua: tlua::StaticLua| -> traft::Result<RaftIndex> {
use tlua::{AnyLuaString, AsLua, LuaError, LuaTable};
let t: LuaTable<_> = AsLua::read(&lua).map_err(|(_, e)| LuaError::from(e))?;
let mp: AnyLuaString = lua
.eval_with("return require 'msgpack'.encode(...)", &t)
.map_err(LuaError::from)?;
let op: Op = Decode::decode(mp.as_bytes())?;
let node = traft::node::global()?;
let mut node_impl = node.node_impl();
let entry_id = node_impl.propose_async(op)?;
node.main_loop.wakeup();
// Release the lock
drop(node_impl);
Ok(entry_id.index)
}),
);
///////////////////////////////////////////////////////////////////////////
#[rustfmt::skip]
luamod_set(
&l,
"exit",
indoc! {"
pico.exit([code])
=================
Terminates the picodata process with the supplied exit code.
Params:
1. code (optional number), default: 0
"},
tlua::function1(|code: Option<i32>| {
crate::tarantool::exit(code.unwrap_or(0))
}),
);
///////////////////////////////////////////////////////////////////////////
luamod_set(
&l,
"expel",
indoc! {"
pico.expel(instance_id)
======================
Expels an instance with instance_id from the cluster. The instance will
keep on running though. If restarted, it will not join the cluster.
Params:
1. instance_id (string)
Returns:
(true)
or
(nil, string) in case of an error
"},
tlua::function1(|instance_id: InstanceId| -> traft::Result<bool> {
let raft_storage = &traft::node::global()?.raft_storage;
let cluster_id = raft_storage.cluster_id()?;
fiber::block_on(rpc::network_call_to_leader(
crate::proc_name!(rpc::expel::proc_expel),
&rpc::expel::Request {
instance_id,
cluster_id,
},
))?;
Ok(true)
}),
);
// log
///////////////////////////////////////////////////////////////////////////
l.get::<tlua::LuaTable<_>, _>("pico")
.unwrap()
.set("log", &[()]);
#[rustfmt::skip]
l.exec_with(
"pico.log.highlight_key = ...",
tlua::function2(|key: String, color: Option<String>| -> Result<(), String> {
let color = match color.as_deref() {
None => None,
Some("red") => Some(tlog::Color::Red),
Some("green") => Some(tlog::Color::Green),
Some("blue") => Some(tlog::Color::Blue),
Some("cyan") => Some(tlog::Color::Cyan),
Some("yellow") => Some(tlog::Color::Yellow),
Some("magenta") => Some(tlog::Color::Magenta),
Some("white") => Some(tlog::Color::White),
Some("black") => Some(tlog::Color::Black),
Some(other) => {
return Err(format!("unknown color: {other:?}"))
}
};
tlog::highlight_key(key, color);
Ok(())
}),
)
.unwrap();
l.exec_with(
"pico.log.clear_highlight = ...",
tlua::function0(tlog::clear_highlight),
)
.unwrap();
///////////////////////////////////////////////////////////////////////////
#[derive(::tarantool::tlua::LuaRead, Default, Clone, Copy)]
enum Justify {
Left,
#[default]
Center,
Right,
}
#[derive(::tarantool::tlua::LuaRead)]
struct RaftLogOpts {
justify_contents: Option<Justify>,
max_width: Option<usize>,
}
luamod_set(
&l,
"raft_log",
indoc! {"
pico.raft_log([opts])
====================================
Inspect raft log contents in human readable format. The contents are
returned as a table of strings similarly to how fselect works for
spaces.
opt.justify_contents can be used to control how contents are justified.
If opts.max_width is specified the output will not be wider than this
many printable characters. If not specified the terminal width is
used, unless it is called in context of a remote session.
Params:
1. opts (table)
- justify_contents (string), one of 'center' | 'left' | 'right', default: 'center'
- max_width (number), default for remote context: 80
Returns:
(table)
Example:
pico.raft_log({justify_contents = 'center', max_width = 100})
"},
tlua::function1(
|opts: Option<RaftLogOpts>| -> traft::Result<Option<Vec<String>>> {
let mut justify_contents = Default::default();
let mut max_width = None;
if let Some(opts) = opts {
justify_contents = opts.justify_contents.unwrap_or_default();
max_width = opts.max_width;
}
let remote_ctx =
crate::tarantool::eval("return box.session.peer() ~= nil").unwrap();
let max_width = crate::unwrap_some_or!(
max_width,
if remote_ctx {
80
} else {
// Need to subtract 4, because this is how many symbols
// are prepended by the console when output format is yaml.
crate::util::screen_size().1 as usize - 5
}
);
let header = ["index", "term", "contents"];
let [index, term, contents] = header;
let mut rows = vec![];
let mut col_widths = header.map(|h| h.len());
let node = traft::node::global()?;
let entries = node
.all_traft_entries()
.map_err(|e| traft::error::Error::Other(Box::new(e)))?;
for entry in entries {
let row = [
entry.index.to_string(),
entry.term.to_string(),
entry.payload().to_string(),
];
for i in 0..col_widths.len() {
col_widths[i] = col_widths[i].max(row[i].len());
}
rows.push(row);
}
let [iw, tw, mut cw] = col_widths;
let total_width = 1 + header.len() + col_widths.iter().sum::<usize>();
if total_width > max_width {
match cw.checked_sub(total_width - max_width) {
Some(new_cw) if new_cw > 0 => cw = new_cw,
_ => {
return Err(traft::error::Error::other("screen too small"));
}
}
}
use std::io::Write;
let mut buf: Vec<u8> = Vec::with_capacity(512);
let write_contents = move |buf: &mut Vec<u8>, contents: &str| match justify_contents
{
Justify::Left => writeln!(buf, "{contents: <cw$}|"),
Justify::Center => writeln!(buf, "{contents: ^cw$}|"),
Justify::Right => writeln!(buf, "{contents: >cw$}|"),
};
let row_sep = |buf: &mut Vec<u8>| {
match justify_contents {
Justify::Left => {
writeln!(buf, "+{0:-^iw$}+{0:-^tw$}+{0:-<cw$}+", "")
}
Justify::Center => {
writeln!(buf, "+{0:-^iw$}+{0:-^tw$}+{0:-^cw$}+", "")
}
Justify::Right => {
writeln!(buf, "+{0:-^iw$}+{0:-^tw$}+{0:->cw$}+", "")
}
}
.unwrap()
};
row_sep(&mut buf);
// NOTE: here and later we use the special uincode \u{01c0} symbol.
// It looks like this `ǀ` (exactly like `|`). We do this,
// because tarantool's yaml handler has special behavior when it
// sees a pipe character at the start of the string. This is the
// same thing as what tarantool's `index:fselect()` is doing.
write!(buf, "\u{01c0}{index: ^iw$}|{term: ^tw$}|").unwrap();
write_contents(&mut buf, contents).unwrap();
row_sep(&mut buf);
for [index, term, contents] in rows {
if contents.chars().count() <= cw {
write!(buf, "\u{01c0}{index: ^iw$}|{term: ^tw$}|").unwrap();
write_contents(&mut buf, &contents).unwrap();
} else {
// use crate::util::TokenType::*;
let mut lexer = crate::util::Lexer::new(&contents);
let first_token = *lexer.peek_token().expect("contents is not empyt");
// Do some extra handling for BatchDml entries so that they look better
let mut args_on_separate_rows = false;
let mut paren_depth = 0;
if first_token.text == "BatchDml" {
args_on_separate_rows = true;
}
let mut prev_token = first_token;
loop {
let mut start = 0;
let mut end = 0;
let mut utf8_len = 0;
let mut first = true;
while let Some(&token) = lexer.peek_token() {
let mut added_chars = token.utf8_count;
if first {
first = false;
start = token.start;
end = token.start;
} else {
// also count the spaces between this and previous token
added_chars += token.start - prev_token.end;
}
// XXX: currently if a token is longer then the
// allotted width, we'll just ingore the contraint
// and make an ugly row. We could subdivide such tokens
// even further, but I don't want to make this code
// even more complicated than it already is...
let new_token_will_overflow = utf8_len + added_chars > cw;
let added_at_least_one_token = start != end;
if new_token_will_overflow && added_at_least_one_token {
break;
}
if args_on_separate_rows
&& added_at_least_one_token
// This depth doesn't take into account the current ")" itself
&& paren_depth == 1
&& token.text == ")"
{
break;
}
// At this point the new token is added to the current piece
utf8_len += added_chars;
end = token.end;
_ = lexer.next_token();
// If we had `defer` this would've been so much cleaner...
let prev_token_ = prev_token;
// Must be assigned before `break`
prev_token = token;
match token.text {
"(" => paren_depth += 1,
")" => paren_depth -= 1,
_ => {}
}
if args_on_separate_rows && paren_depth == 1 {
if token.text == "(" {
break;
}
if prev_token_.text == ")" {
break;
}
}
}
if start == 0 {
write!(buf, "\u{01c0}{index: ^iw$}|{term: ^tw$}|").unwrap();
} else {
write!(buf, "\u{01c0}{blank: ^iw$}|{blank: ^tw$}|", blank = "~",)
.unwrap();
}
write_contents(&mut buf, &contents[start..end]).unwrap();
if end == contents.len() {
break;
}
}
}
}
row_sep(&mut buf);
let s = String::from_utf8_lossy(&buf);
let mut res = vec![];
for line in s.lines() {
// replace all spaces with the non-breaking space character
// to prevent tarantool's yaml handler from breaking the
// rows which we tried so hard to format correctly
let line = line.replace(' ', "\u{a0}");
res.push(line);
}
Ok(Some(res))
},
),
);
///////////////////////////////////////////////////////////////////////////
luamod_set(
&l,
"raft_compact_log",
indoc! {"
pico.raft_compact_log(up_to)
============================
Trims raft log up to the given index (excluding the index itself).
Returns the new first_index after the log compaction. Returns when
transaction is committed to the local storage.
Params:
1. up_to (number)
Returns:
(number)
"},
{
tlua::function1(|up_to: RaftIndex| -> traft::Result<RaftIndex> {
let raft_storage = &node::global()?.raft_storage;
let ret = transaction(|| raft_storage.compact_log(up_to));
Ok(ret?)
})
},
);
///////////////////////////////////////////////////////////////////////////
luamod_set(
&l,
"cas",
indoc! {"
pico.cas(dml[, predicate])
==========================
Performs a clusterwide compare-and-swap operation. Works for global
tables only.
E.g. it checks the `predicate` on leader and, if no conflicting entries
were found, appends the new entry to the raft log and returns its index
(uncommitted yet).
The `predicate` consists of three parts:
- index
- term
- ranges
If `predicate` is not supplied, it will be auto generated with `index`
and `term` taken from the current instance and with empty `ranges`.
Omitting `ranges` implies a blind write, therefore such usage is
discouraged.
Returns when the operation is appended to the raft log on a leader.
Returns the index of the corresponding Op::Dml.
Params:
1. dml (table)
- kind (string), one of 'insert' | 'replace' | 'update' | 'delete'
- table (string)
- tuple (optional table), mandatory for insert and replace, see [1, 2]
- key (optional table), mandatory for update and delete, see [3, 4]
- ops (optional table), mandatory for update see [3]
2. predicate (optional table)
- index (optional number), default: current applied index
- term (optional number), default: current term
- ranges (optional table {table CasRange,...}), see pico.help('table CasRange'),
default: {} (empty table)
Returns:
(number)
or
(nil, string) in case of an error
See also:
[1]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/insert/
[2]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/replace/
[3]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/update/
[4]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/delete/
Example:
-- Assuming there exists a table 'friends_of_peppa' with two
-- fields: id (unsigned) and name (string) and corresponding
-- unique indexes.
-- Insert a tuple {1, 'Suzy'} into this table. This will fail
-- if the term of the current instance is outdated.
pico.cas({
kind = 'insert',
table = 'friends_of_peppa',
tuple = {1, 'Suzy'},
})
-- Add Rebecca, but only if no other friends were added after Suzy.
pico.cas({
kind = 'replace',
table = 'friends_of_peppa',
tuple = {2, 'Rebecca'},
}, {
ranges = {{
table = 'friends_of_peppa',
key_min = { kind = 'excluded', key = {1,} },
key_max = { kind = 'unbounded' },
}},
})
-- Check that there were no other updates, and update the second
-- Peppa friend, replacing 'Rebecca' with 'Emily'.
pico.cas({
kind = 'update',
table = 'friends_of_peppa',
key = {2},
ops = {{'=', 2, 'Emily'}},
}, {
ranges = {{
table = 'friends_of_peppa',
key_min = { kind = 'included', key = {2} },
key_max = { kind = 'included', key = {2} },
}},
})
-- Delete the second Peppa friend, specifying index and term
-- explicitly. It's necessary when there are some yielding
-- operations between reading and writing.
index, term =
assert(pico.raft_get_index()),
assert(pico.raft_status()).term
emily = box.space.friends_of_peppa.index.name:get('Emily')
fiber.sleep(1) -- do something yielding
pico.cas({
kind = 'delete',
table = 'friends_of_peppa',
key = {emily.id},
}, {
index = index,
term = term,
ranges = {{
table = 'friends_of_peppa',
key_min = { kind = 'included', key = {emily.id} },
key_max = { kind = 'included', key = {emily.id} },
}},
})
"},
tlua::function2(
|op: op::DmlInLua,
predicate: Option<cas::PredicateInLua>|
-> traft::Result<RaftIndex> {
// su is needed here because for cas execution we need to consult with system spaces like `_raft_state`
// and the user executing cas request may not (even shouldnt) have access to these spaces
let su = session::su(ADMIN_ID)?;
let op = op::Dml::from_lua_args(op, su.original_user_id)
.map_err(traft::error::Error::other)?;
let predicate = cas::Predicate::from_lua_args(predicate.unwrap_or_default())?;
let req = crate::cas::Request::new(op, predicate, su.original_user_id)?;
let deadline = fiber::clock().saturating_add(Duration::from_secs(3));
let res = cas::compare_and_swap(&req, false, deadline)?;
let (index, _) = res.no_retries()?;
Ok(index)
},
),
);
luamod_set(
&l,
"batch_cas",
indoc! {"
pico.batch_cas(ops[, predicate])
======================
The same as pico.cas, but allows multiple dml operations.
"},
tlua::function2(
|arg: op::BatchDmlInLua,
predicate: Option<cas::PredicateInLua>|
-> traft::Result<RaftIndex> {
// su is needed here because for cas execution we need to consult with system spaces like `_raft_state`
// and the user executing cas request may not (even shouldnt) have access to these spaces
let su = session::su(ADMIN_ID)?;
let mut dmls = Vec::with_capacity(arg.ops.len());
for op in arg.ops {
dmls.push(
op::Dml::from_lua_args(op, su.original_user_id)
.map_err(traft::error::Error::other)?,
)
}
let predicate = cas::Predicate::from_lua_args(predicate.unwrap_or_default())?;
let req = crate::cas::Request::new(
Op::BatchDml { ops: dmls },
predicate,
su.original_user_id,
)?;
let deadline = fiber::clock().saturating_add(Duration::from_secs(3));
let res = cas::compare_and_swap(&req, false, deadline)?;
let (index, _) = res.no_retries()?;
Ok(index)
},
),
);
luamod_set_help_only(
&l,
"table CasRange",
indoc! {"
table CasRange
==============
CasRange is a Lua table describing a range to be used as a
compare-and-swap predicate, see pico.help('cas')
Fields:
- table (string)
- key_min (table CasBound), see pico.help('table CasBound')
- key_max (table CasBound)
Example:
local unbounded = { kind = 'unbounded' }
local including_1 = { kind = 'included', key = {1,} }
local excluding_3 = { kind = 'excluded', key = {3,} }
local range_a = {
table = 'friends_of_peppa',
key_min = unbounded,
key_max = unbounded,
}
-- [1, 3)
local range_a = {
table = 'friends_of_peppa',
key_min = including_1,
key_max = excluding_3,
}
"},
);
luamod_set_help_only(
&l,
"table CasBound",
indoc! {"
table CasBound
==============
A Lua table representing a range bound (either min or max) used in
CasRange, see pico.help('table CasRange')
Fields:
- kind (string), one of 'included' | 'excluded' | 'unbounded'
- key (optional table), mandatory for included and excluded
"},
);
luamod_set(
&l,
"abort_ddl",
indoc! {"
pico.abort_ddl([timeout])
=======================
Aborts a pending schema change.
Returns an index of the corresponding Op::DdlAbort raft entry.
Returns an error if there is no pending schema change operation.
Params:
1. timeout (optional number), in seconds, default: infinity
Returns:
(number)
or
(nil, string) in case of an error
"},
{
tlua::function1(|timeout: Option<f64>| -> traft::Result<RaftIndex> {
let timeout = if let Some(timeout) = timeout {
duration_from_secs_f64_clamped(timeout)
} else {
INFINITY
};
let deadline = fiber::clock().saturating_add(timeout);
schema::abort_ddl(deadline)
})
},
);
luamod_set(
&l,
"wait_ddl_finalize",
indoc! {"
pico.wait_ddl_finalize(index, [opts])
=======================
Waits for the ddl operation at given raft index to be finalized.
Returns raft index of the finilizing entry.
Params:
1. index (number), raft index
2. opts (optional table)
- timeout (optional number), in seconds, default: infinity
Returns:
(number) raft index
or
(nil, string) in case of an error
"},
{
#[derive(::tarantool::tlua::LuaRead)]
struct Opts {
timeout: Option<f64>,
}
tlua::Function::new(
|index: RaftIndex, opts: Option<Opts>| -> traft::Result<RaftIndex> {
let mut timeout = INFINITY;
if let Some(opts) = opts {
if let Some(t) = opts.timeout {
timeout = duration_from_secs_f64_clamped(t);
}
}
let commit_index = schema::wait_for_ddl_commit(index, timeout)?;
Ok(commit_index)
},
)
},
);
luamod_set_help_only(
&l,
"table TableField",
indoc! {"
table TableField
================
A Lua table describing a field in a table, see [1].
Fields:
- name (string)
- type (string), see [2]
- is_nullable (boolean)
Example:
{name = 'id', type = 'unsigned', is_nullable = false}
{name = 'value', type = 'unsigned', is_nullable = false}
See also:
[1]: https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/format/
[2]: https://docs.rs/tarantool/latest/tarantool/space/enum.FieldType.html
"},
);
luamod_set(
&l,
"raft_term",
indoc! {"
pico.raft_term([index])
=======================
Returns term of the raft entry at the given index or the current term if
no argument was specified.
Params:
1. index (optional number), raft index
Returns:
(number)
or
(nil, string) in case of an error
"},
{
tlua::function1(|index: Option<RaftIndex>| -> traft::Result<RaftTerm> {
let node = node::global()?;
let term = if let Some(index) = index {
raft::Storage::term(&node.raft_storage, index)?
} else {
node.status().term
};
Ok(term)
})
},
);
///////////////////////////////////////////////////////////////////////////
#[rustfmt::skip]
luamod_set(
&l,
"install_plugin",
indoc! {"
pico.install_plugin(name, version, [opts])
=================
Install a new plugin on cluster.
Params:
1. name - plugin name, manifest with same name must exists in plugin_dir
2. version - plugin version
3. opts (optional table)
- timeout (optional number), in seconds, default: 10
- if_not_exists (optional bool), in seconds, default: false
"},
{
#[derive(::tarantool::tlua::LuaRead)]
struct Opts {
timeout: Option<f64>,
if_not_exists: Option<bool>,
}
tlua::function3(|name: String, version: String, opts: Option<Opts>| -> traft::Result<()> {
let mut timeout = Duration::from_secs(10);
let mut if_not_exists = false;
if let Some(opts) = opts {
if_not_exists = opts.if_not_exists.unwrap_or_default();
if let Some(t) = opts.timeout {
timeout = duration_from_secs_f64_clamped(t);
}
}
plugin::install_plugin(PluginIdentifier::new(name, version), timeout, if_not_exists)
})
},
);
///////////////////////////////////////////////////////////////////////////
#[rustfmt::skip]
luamod_set(
&l,
"enable_plugin",
indoc! {"
pico.enable_plugin(name, version, [opts])
=================
Enable plugin in cluster.
Params:
1. name - plugin name, plugin should already be installed with `pico.install_plugin` command
2. version - plugin version
3. opts (optional table)
- on_start_timeout (optional number), in seconds, default: 5
- timeout (optional number), in seconds, default: 10
"},
{
#[derive(::tarantool::tlua::LuaRead)]
struct Opts {
timeout: Option<f64>,
on_start_timeout: Option<f64>,
}
tlua::function3(|name: String, version: String, opts: Option<Opts>| -> traft::Result<()> {
let mut on_start_timeout = Duration::from_secs(5);
let mut timeout = Duration::from_secs(10);
if let Some(opts) = opts {
if let Some(t) = opts.on_start_timeout {
on_start_timeout = duration_from_secs_f64_clamped(t);
}
if let Some(t) = opts.timeout {
timeout = duration_from_secs_f64_clamped(t);
}
}
plugin::enable_plugin(&PluginIdentifier::new(name, version), on_start_timeout, timeout)
})
},
);
///////////////////////////////////////////////////////////////////////////
#[rustfmt::skip]
luamod_set(
&l,
"service_append_tier",
indoc! {"
pico.service_append_tier(plugin_name, plugin_version, service_name, tier, [opts])
=================
Append service to a tier, this will enable service on all instances with coressponding tier.
Params:
1. plugin_name - plugin name, plugin should already be installed with `pico.install_plugin` command
2. plugin_version - plugin version
3. service_name - service name
4. tier - tier where service must be enabled
5. opts (optional table)
- timeout (optional number), in seconds, default: 10
"},
{
#[derive(::tarantool::tlua::LuaRead)]
struct Opts {
timeout: Option<f64>,
}
tlua::function5(|plugin_name: String, plugin_version: String, service_name: String, tier: String, opts: Option<Opts>| -> traft::Result<()> {
let mut timeout = Duration::from_secs(10);
if let Some(opts) = opts {
if let Some(t) = opts.timeout {
timeout = duration_from_secs_f64_clamped(t);
}
}
plugin::update_service_tiers(
&PluginIdentifier::new(plugin_name, plugin_version),
&service_name,
&tier,
TopologyUpdateOpKind::Add,
timeout
)
})
},
);
///////////////////////////////////////////////////////////////////////////
#[rustfmt::skip]
luamod_set(
&l,
"service_remove_tier",
indoc! {"
pico.service_remove_tier(plugin_name, plugin_version, service_name, tier, [opts])
=================
Remove service from tier, this will disable service on all instances with coressponding tier.
Params:
1. plugin_name - plugin name, plugin should already be installed with `pico.install_plugin` command
2. plugin_version - plugin version
3. service_name - service name
4. tier - tier where service must be disabled
5. opts (optional table)
- timeout (optional number), in seconds, default: 10
"},
{
#[derive(::tarantool::tlua::LuaRead)]
struct Opts {
timeout: Option<f64>,
}
tlua::function5(|plugin_name: String, plugin_version: String, service_name: String, tier: String, opts: Option<Opts>| -> traft::Result<()> {
let mut timeout = Duration::from_secs(10);
if let Some(opts) = opts {
if let Some(t) = opts.timeout {
timeout = duration_from_secs_f64_clamped(t);
}
}
plugin::update_service_tiers(
&PluginIdentifier::new(plugin_name, plugin_version),
&service_name,
&tier,
TopologyUpdateOpKind::Remove,
timeout
)
})
},
);
///////////////////////////////////////////////////////////////////////////
#[rustfmt::skip]
luamod_set(
&l,
"_update_plugin_config",
indoc! {"
pico._update_plugin_config(plugin_name, plugin_version, service_name, new_config, [opts])
=================
Internal API, see src/luamod.rs for the details.
Params:
1. plugin_name - plugin name
2. plugin_version - plugin version
3. service_name - service name
4. new_config - new configuration
5. opts (optional table)
- timeout (optional number), in seconds, default: 10
"},
{
use tarantool::tlua::AnyLuaString;
#[derive(::tarantool::tlua::LuaRead)]
struct Opts {
timeout: Option<f64>,
}
tlua::function5(|plugin_name: String, plugin_version: String, service_name: String, new_cfg_raw: AnyLuaString, opts: Option<Opts>| -> traft::Result<()> {
let mut timeout = Duration::from_secs(10);
if let Some(opts) = opts {
if let Some(t) = opts.timeout {
timeout = duration_from_secs_f64_clamped(t);
}
}
plugin::update_plugin_service_configuration(&PluginIdentifier::new(plugin_name, plugin_version), &service_name, new_cfg_raw.as_bytes(), timeout)
})
},
);
///////////////////////////////////////////////////////////////////////////
#[rustfmt::skip]
luamod_set(
&l,
"disable_plugin",
indoc! {"
pico.disable_plugin(name, version, [opts])
=================
Disable plugin on cluster, `on_stop` callbacks will be called.
Params:
1. name - plugin name to be disabled
2. version - plugin version to be disabled
3. opts (optional table)
- timeout (optional number), in seconds, default: 10
"},
{
#[derive(::tarantool::tlua::LuaRead)]
struct Opts {
timeout: Option<f64>,
}
tlua::function3(|name: String, version: String, opts: Option<Opts>| -> traft::Result<()> {
let mut timeout = Duration::from_secs(10);
if let Some(opts) = opts {
if let Some(t) = opts.timeout {
timeout = duration_from_secs_f64_clamped(t);
}
}
plugin::disable_plugin(&PluginIdentifier::new(name, version), timeout)
})
},
);
///////////////////////////////////////////////////////////////////////////
#[rustfmt::skip]
luamod_set(
&l,
"remove_plugin",
indoc! {"
pico.remove_plugin(name, version, [opts])
=================
Remove a plugin.
Params:
1. name - plugin name to be removed from a system
2. version - plugin version to be removed from a system
3. opts (optional table)
- timeout (optional number), in seconds, default: 10
"},
{
#[derive(::tarantool::tlua::LuaRead)]
struct Opts {
timeout: Option<f64>,
}
tlua::function3(|name: String, version: String, opts: Option<Opts>| -> traft::Result<()> {
let mut timeout = Duration::from_secs(10);
if let Some(opts) = opts {
if let Some(t) = opts.timeout {
timeout = duration_from_secs_f64_clamped(t);
}
}
plugin::remove_plugin(&PluginIdentifier::new(name, version), timeout)
})
},
);
///////////////////////////////////////////////////////////////////////////
#[rustfmt::skip]
luamod_set(
&l,
"migration_up",
indoc! {"
pico.migration_up(name, version, [opts])
=================
Up plugin migration.
Params:
1. name - plugin name, manifest with same name must exists in plugin_dir
2. version - plugin version
3. opts (optional table)
- timeout (optional number), in seconds, default: 10
- rollback_timeout (optional number), in seconds, default: 10
"},
{
#[derive(::tarantool::tlua::LuaRead)]
struct Opts {
timeout: Option<f64>,
rollback_timeout: Option<f64>,
}
tlua::function3(|name: String, version: String, opts: Option<Opts>| -> traft::Result<()> {
let mut timeout = Duration::from_secs(10);
let mut rollback_timeout = Duration::from_secs(10);
if let Some(opts) = opts {
if let Some(t) = opts.timeout {
timeout = duration_from_secs_f64_clamped(t);
}
if let Some(t) = opts.rollback_timeout {
rollback_timeout = duration_from_secs_f64_clamped(t);
}
}
plugin::migration_up(&PluginIdentifier::new(name, version), timeout, rollback_timeout)
})
},
);
///////////////////////////////////////////////////////////////////////////
#[rustfmt::skip]
luamod_set(
&l,
"migration_down",
indoc! {"
pico.migration_down(name, version, [opts])
=================
DOWN plugin migration.
Params:
1. name - plugin name, manifest with same name must exists in plugin_dir
2. version - plugin version
3. opts (optional table)
- timeout (optional number), in seconds, default: 10
"},
{
#[derive(::tarantool::tlua::LuaRead)]
struct Opts {
timeout: Option<f64>,
}
tlua::function3(|name: String, version: String, opts: Option<Opts>| -> traft::Result<()> {
let mut timeout = Duration::from_secs(10);
if let Some(opts) = opts {
if let Some(t) = opts.timeout {
timeout = duration_from_secs_f64_clamped(t);
}
}
plugin::migration_down(PluginIdentifier::new(name, version), timeout)
})
},
);
#[cfg(feature = "error_injection")]
luamod_set(
&l,
"_inject_error",
indoc! {"
pico._inject_error(error, enable)
Internal API, see src/luamod.rs for the details.
Params:
1. error (string)
2. enable (bool)
"},
{
tlog!(Info, "error injection enabled");
tlua::Function::new(|error: String, enable: bool| {
crate::error_injection::enable(&error, enable);
})
},
);
}