Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/tarantool-module
1 result
Show changes
Commits on Source (184)
Showing
with 1854 additions and 432 deletions
default:
image:
name: docker-public.binary.picodata.io/kaniko-project/executor:v1.14.0-debug
entrypoint: ['']
pull_policy: [if-not-present]
tags:
- docker-k8s
stages:
- build-base-image
- test
......@@ -11,35 +19,41 @@ workflow:
when: never
- if: $CI_PIPELINE_SOURCE == "push"
- if: $CI_PIPELINE_SOURCE == "web"
- if: $CI_PIPELINE_SOURCE == "trigger"
- if: $CI_PIPELINE_SOURCE == "pipeline"
variables:
GIT_DEPTH: 1
GIT_STRATEGY: fetch
DOCKER_AUTH_CONFIG: $DOCKER_AUTH_RW
CACHE_PATHS: target
CARGO_INCREMENTAL: 0
RUST_VERSION: 1.67.1
CARGO_HOME: /shared-storage/tarantool-module/.cargo
VANILLA_DOCKER_IMAGE: docker-public.binary.picodata.io/tarantool-module-build-base-vanilla
PICODATA_DOCKER_IMAGE: docker-public.binary.picodata.io/tarantool-module-build-base-picodata
BASE_IMAGE_VANILLA: docker-public.binary.picodata.io/tarantool-module-build-base-vanilla
BASE_IMAGE_FORK: docker-public.binary.picodata.io/tarantool-module-build-base-fork
# job:rules explained:
#
# - if build-base changes on master branch (compared to HEAD~1)
# * build-base-image (with tag latest) and push
# * test (on base-image:latest)
# * build-base-image-vanilla/fork (with tag latest) and push
# * test (on corresponding base-image:latest)
# - if build-base changes on development branch (compared to master)
# * build-base-image (with tag sha)
# * test (on base-image:sha)
# * build-base-image-vanilla/fork (with tag <SHA>)
# * test (on on corresponding base-image:<SHA>)
# - else (if build-base doesn't change)
# * skip build-base-image
# * just test (on base-image:latest)
# * just test (on existing base-image:latest)
#
# A different workflow applies on trigger (when tarantool is tagged):
# * build-base-image-fork from specified TARANTOOL_TAG (with tag <SHA>-triggered)
# * test (on base-image-fork:<SHA>-triggered)
#
# Anchor syntax explained here:
# https://docs.gitlab.com/ee/ci/yaml/yaml_optimization.html
#
.rules:
- &if-build-base-changes-on-master-branch
if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
if: ($CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH) && ($CI_PIPELINE_SOURCE != "pipeline")
changes:
# implies compare_to HEAD~1
paths: &build-base-changes-paths
......@@ -47,64 +61,68 @@ variables:
- .gitlab-ci.yml
- &if-build-base-changes-on-dev-branch
if: $CI_COMMIT_BRANCH != $CI_DEFAULT_BRANCH
if: ($CI_COMMIT_BRANCH != $CI_DEFAULT_BRANCH) && ($CI_PIPELINE_SOURCE != "pipeline")
changes:
compare_to: master
paths: *build-base-changes-paths
- &if-build-base-on-trigger
if: $CI_PIPELINE_SOURCE == "trigger"
# Если запущено через родительский downstream trigger
- &if-triggered
if: $CI_PIPELINE_SOURCE == "pipeline"
- &else {}
build-base-image:
.docker-login: &docker-login
before_script:
- mkdir -p /kaniko/.docker
- echo "$DOCKER_AUTH_RW" > /kaniko/.docker/config.json
# Билд контейнера через kaniko
.build-base-image:
stage: build-base-image
tags:
- tmodule-shell
rules:
- <<: *if-build-base-changes-on-master-branch
variables:
BASE_IMAGE_TAG: latest
PUSH_DOCKER: ""
- <<: *if-build-base-changes-on-dev-branch
variables:
BASE_IMAGE_TAG: ${CI_COMMIT_SHA}
- <<: *if-build-base-on-trigger
variables:
BASE_IMAGE_TAG: ${CI_COMMIT_SHA}
PUSH_DOCKER: ""
- <<: *else
when: never
<<: *docker-login
script:
- docker pull ${VANILLA_DOCKER_IMAGE} || true
- docker pull ${PICODATA_DOCKER_IMAGE} || true
- >
docker build
--build-arg RUST_VERSION=1.67.1
-t ${VANILLA_DOCKER_IMAGE}:${BASE_IMAGE_TAG}
-f ./docker/vanilla.Dockerfile
--label GIT_COMMIT=${CI_COMMIT_SHA}
--cache-from ${VANILLA_DOCKER_IMAGE}
./docker
- >
docker build
--build-arg RUST_VERSION=1.67.1
--build-arg TARANTOOL_TAG=latest
-t ${PICODATA_DOCKER_IMAGE}:${BASE_IMAGE_TAG}
-f ./docker/picodata.Dockerfile
--label GIT_COMMIT=${CI_COMMIT_SHA}
--cache-from ${PICODATA_DOCKER_IMAGE}
./docker
- |
# Push image to registry
if [ "${CI_COMMIT_BRANCH}" == "${CI_DEFAULT_BRANCH}" ]; then
mkdir -p $CI_PROJECT_DIR/.docker
echo $DOCKER_AUTH_RW > $CI_PROJECT_DIR/.docker/config.json
echo "Pushing ${VANILLA_DOCKER_IMAGE}:${BASE_IMAGE_TAG}"
docker --config $CI_PROJECT_DIR/.docker/ push ${VANILLA_DOCKER_IMAGE}:${BASE_IMAGE_TAG}
echo "Pushing ${PICODATA_DOCKER_IMAGE}:${BASE_IMAGE_TAG}"
docker --config $CI_PROJECT_DIR/.docker/ push ${PICODATA_DOCKER_IMAGE}:${BASE_IMAGE_TAG}
else
echo "Skip pushing image on a non-master branch"
fi
/kaniko/executor --context $CI_PROJECT_DIR --dockerfile ${DOCKERFILE} \
--build-arg "RUST_VERSION=${RUST_VERSION}" ${EXTRA_BUILD_ARGS} ${PUSH_DOCKER} \
--cache=false --cache-run-layers=true --single-snapshot --compressed-caching=false --use-new-run --snapshot-mode=redo --cleanup \
--destination ${BASE_IMAGE_NAME}:${BASE_IMAGE_TAG}
build-base-image-vanilla:
extends: .build-base-image
variables:
BASE_IMAGE_NAME: ${BASE_IMAGE_VANILLA}
DOCKERFILE: ./docker/vanilla.Dockerfile
build-base-image-fork:
extends: .build-base-image
variables:
BASE_IMAGE_NAME: ${BASE_IMAGE_FORK}
EXTRA_BUILD_ARGS: >
--build-arg TARANTOOL_TAG=latest
DOCKERFILE: ./docker/picodata.Dockerfile
build-base-image-fork-on-trigger:
extends: .build-base-image
rules: # overrides whole section from .build-base-image
- <<: *if-triggered
variables:
BASE_IMAGE_NAME: ${BASE_IMAGE_FORK}
BASE_IMAGE_TAG: ${CI_COMMIT_SHA}-triggered
PUSH_DOCKER: ""
EXTRA_BUILD_ARGS: >
--build-arg TARANTOOL_TAG=${TARANTOOL_TAG}
DOCKERFILE: ./docker/picodata.Dockerfile
.test:
stage: test
......@@ -115,17 +133,12 @@ build-base-image:
- <<: *if-build-base-changes-on-dev-branch
variables:
BASE_IMAGE_TAG: ${CI_COMMIT_SHA}
- <<: *if-build-base-on-trigger
variables:
BASE_IMAGE_TAG: ${CI_COMMIT_SHA}
- <<: *else
variables:
BASE_IMAGE_TAG: latest
tags:
- tmodule-docker
image:
name: ${DOCKER_IMAGE}:${BASE_IMAGE_TAG}
pull_policy: if-not-present
name: ${BASE_IMAGE_NAME}:${BASE_IMAGE_TAG}
pull_policy: [if-not-present]
before_script:
- |
# Restore cache
......@@ -143,9 +156,6 @@ build-base-image:
fi
script:
- cargo -V
- cargo fmt --all --check
- cargo clippy --version
- cargo clippy --features "${CARGO_FEATURES}" --workspace --tests -- --deny warnings
- cargo build --features "${CARGO_FEATURES}" --all
- cargo test --no-default-features -p tarantool
- cargo test --features "${CARGO_FEATURES}"
......@@ -164,34 +174,66 @@ build-base-image:
echo "Skip saving cache on a non-master branch"
fi
lint-vanilla:
extends: .test
variables:
CACHE_ARCHIVE: /shared-storage/tarantool-module/vanilla-cache.tar
BASE_IMAGE_NAME: ${BASE_IMAGE_VANILLA}
CARGO_FEATURES: default
script:
- cargo fmt --all --check
- cargo clippy --version
- cargo clippy --features "${CARGO_FEATURES}" --workspace --tests -- --deny warnings
test-vanilla:
extends: .test
variables:
CACHE_ARCHIVE: /shared-storage/tarantool-module/vanilla-cache.tar
DOCKER_IMAGE: ${VANILLA_DOCKER_IMAGE}
BASE_IMAGE_NAME: ${BASE_IMAGE_VANILLA}
CARGO_FEATURES: default
bench-vanilla:
extends: .test
variables:
DOCKER_IMAGE: ${VANILLA_DOCKER_IMAGE}
BASE_IMAGE_NAME: ${BASE_IMAGE_VANILLA}
script:
- cat /proc/cpuinfo
- make bench
test-picodata:
lint-fork:
extends: .test
variables:
CACHE_ARCHIVE: /shared-storage/tarantool-module/picodata-cache.tar
BASE_IMAGE_NAME: ${BASE_IMAGE_FORK}
CARGO_FEATURES: picodata,tokio_components
script:
- cargo clippy --version
- cargo clippy --features "${CARGO_FEATURES}" --workspace --tests -- --deny warnings
test-fork:
extends: .test
variables:
CACHE_ARCHIVE: /shared-storage/tarantool-module/picodata-cache.tar
BASE_IMAGE_NAME: ${BASE_IMAGE_FORK}
CARGO_FEATURES: picodata,tokio_components
test-fork-on-trigger:
extends: .test
rules: # overrides whole section from .test
- <<: *if-triggered
variables:
CACHE_ARCHIVE: /shared-storage/tarantool-module/picodata-cache.tar
DOCKER_IMAGE: ${PICODATA_DOCKER_IMAGE}
CARGO_FEATURES: picodata
BASE_IMAGE_NAME: ${BASE_IMAGE_FORK}
BASE_IMAGE_TAG: ${CI_COMMIT_SHA}-triggered
CARGO_FEATURES: picodata,tokio_components
pages:
extends: .test
variables:
DOCKER_IMAGE: ${PICODATA_DOCKER_IMAGE}
BASE_IMAGE_NAME: ${BASE_IMAGE_FORK}
RUSTDOCFLAGS: "-Dwarnings"
script:
- cargo doc --workspace --no-deps --features "picodata"
- cargo doc --workspace --no-deps --features "picodata,tokio_components"
- rm -rf public
- mv target/doc public
artifacts:
......
This diff is collapsed.
......@@ -6,7 +6,7 @@ test:
cargo test
test-pd:
cargo build -p tarantool-module-test-runner --features=picodata
cargo build -p tarantool-module-test-runner --features=picodata,tokio_components
TARANTOOL_EXECUTABLE=tarantool-pd cargo test
bench:
......
......@@ -38,7 +38,7 @@ For deployment check out the deployment notes at the end of this file.
### Prerequisites
- Tarantool 2.2
- Tarantool 2.10+
#### Linking issues in macOS
......@@ -56,7 +56,7 @@ rustflags = [
Add the following lines to your project's Cargo.toml:
```toml
[dependencies]
tarantool = "3.0"
tarantool = "4.0"
[lib]
crate-type = ["cdylib"]
......@@ -103,7 +103,7 @@ edition = "2018"
# author, license, etc
[dependencies]
tarantool = "3.0"
tarantool = "4.0"
serde = "1.0"
[lib]
......
......@@ -3,12 +3,20 @@ FROM docker-public.binary.picodata.io/tarantool:${TARANTOOL_TAG}
ARG RUST_VERSION
RUN set -e; \
yum -y install gcc git; \
yum clean all;
rm -f /etc/yum.repos.d/pg.repo && \
yum -y install gcc git && \
yum clean all
# Install rust + cargo
ENV PATH=/root/.cargo/bin:${PATH}
RUN set -e; \
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs |\
sh -s -- -y --profile default --default-toolchain ${RUST_VERSION} -c rustfmt -c clippy;
COPY ci-log-section /usr/bin/ci-log-section
# Install glauth for LDAP testing
RUN set -e; \
cd /bin; \
curl -L -o glauth https://github.com/glauth/glauth/releases/download/v2.3.0/glauth-linux-amd64; \
chmod +x glauth;
COPY docker/ci-log-section /usr/bin/ci-log-section
......@@ -11,4 +11,4 @@ RUN set -e; \
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs |\
sh -s -- -y --profile default --default-toolchain ${RUST_VERSION} -c rustfmt -c clippy;
COPY ci-log-section /usr/bin/ci-log-section
COPY docker/ci-log-section /usr/bin/ci-log-section
use http_types::{Method, Request, Url};
use tarantool::error::Error;
use tarantool::fiber;
use tarantool::network::client::tcp::TcpStream;
use tarantool::network::client::tcp::UnsafeSendSyncTcpStream;
use tarantool::proc;
#[proc]
fn get(url: &str) -> http_types::Result<()> {
fn get(url: &str) -> Result<(), Error> {
fiber::block_on(async {
println!("Connecting...");
let url = Url::parse(url)?;
let url = Url::parse(url).map_err(Error::other)?;
let host = url
.host_str()
.ok_or(http_types::Error::from_display("host not specified"))?;
.ok_or_else(|| Error::other("host not specified"))?;
let req = Request::new(Method::Get, url.clone());
let mut res = match url.scheme() {
"http" => {
let stream = TcpStream::connect(host, 80)
.await
.map_err(http_types::Error::from_display)?;
let stream = TcpStream::connect(host, 80).map_err(Error::other)?;
let stream = UnsafeSendSyncTcpStream(stream);
println!("Sending request over http...");
async_h1::connect(stream, req).await?
async_h1::connect(stream, req).await.map_err(Error::other)?
}
#[cfg(feature = "tls")]
"https" => {
let stream = TcpStream::connect(host, 443)
let stream = TcpStream::connect(host, 443).map_err(Error::other)?;
let stream = UnsafeSendSyncTcpStream(stream);
let stream = async_native_tls::connect(host, stream)
.await
.map_err(http_types::Error::from_display)?;
let stream = async_native_tls::connect(host, stream).await?;
.map_err(Error::other)?;
println!("Sending request over https...");
async_h1::connect(stream, req).await?
async_h1::connect(stream, req).await.map_err(Error::other)?
}
_ => {
return Err(http_types::Error::from_display("scheme not supported"));
return Err(Error::other("scheme not supported"));
}
};
println!("Response Status: {}", res.status());
println!("Response Body: {}", res.body_string().await?);
println!(
"Response Body: {}",
res.body_string().await.map_err(Error::other)?
);
Ok(())
})
}
[package]
name = "perf-test"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tarantool = { path = "../tarantool" }
tarantool = { path = "../tarantool", features = [ "internal_test" ] }
serde = { version = "*", features = ["derive"] }
rmp-serde = "1"
[lib]
crate-type = ["cdylib"]
......@@ -6,7 +6,7 @@ local tmpdir = fio.tempdir()
box.cfg{
log_level = 'verbose',
listen = 3301,
listen = 0,
wal_mode = 'none',
memtx_dir = tmpdir,
}
......@@ -50,26 +50,49 @@ package.cpath = string.format(
box.schema.func.create('libperf_test.bench_netbox', {language = 'C'})
box.schema.func.create('libperf_test.bench_network_client', {language = 'C'})
box.schema.func.create('libperf_test.bench_custom_encode', {language = 'C'})
box.schema.func.create('libperf_test.bench_custom_decode', {language = 'C'})
box.schema.func.create('libperf_test.bench_serde_encode', {language = 'C'})
box.schema.func.create('libperf_test.bench_serde_decode', {language = 'C'})
box.schema.func.create('libperf_test.l_print_stats', {language = 'C'})
box.schema.func.create('libperf_test.l_n_iters', {language = 'C'})
function bench_lua_netbox()
local clock = require('clock')
local net_box = require("net.box")
local conn = net_box:connect('localhost:3301')
conn:wait_connected()
local connect_deadline = clock.monotonic() + 3 -- seconds
local conn
repeat
conn = net_box:connect(box.info.listen)
local ok = conn:wait_connected(clock.monotonic() - connect_deadline)
if clock.monotonic() > connect_deadline then
error(string.format('Failed to establish a connection to port %s', box.info.listen))
end
until ok or clock.monotonic() > connect_deadline
local samples = {}
local n = box.func['libperf_test.l_n_iters']:call()
-- benchmarking loop
for i = 1, n do
local start = clock.monotonic64()
local res = conn:call('test_stored_proc', {1, 2})
samples[i] = clock.monotonic64() - start
end
conn:close()
box.func['libperf_test.l_print_stats']:call{"lua_netbox", samples}
end
print("================ iproto_clients =================")
bench_lua_netbox()
box.func['libperf_test.bench_netbox']:call()
box.func['libperf_test.bench_network_client']:call()
print()
print("============= msgpack_serialization =============")
box.func['libperf_test.bench_custom_encode']:call()
box.func['libperf_test.bench_serde_encode']:call()
box.func['libperf_test.bench_custom_decode']:call()
box.func['libperf_test.bench_serde_decode']:call()
os.exit(0)
use std::{future::Future, time::Duration};
use tarantool::{
fiber,
net_box::{Conn, ConnOptions, Options},
network::client::{AsClient as _, Client},
proc,
time::Instant,
};
use tarantool::{fiber, proc, time::Instant};
const N_ITERS: usize = 100_000;
const N_ITERS: usize = 10_000;
const PREHEAT_ITERS: usize = 1_000;
#[proc]
fn l_n_iters() -> usize {
N_ITERS
}
#[proc]
fn bench_network_client() {
let client = fiber::block_on(Client::connect("localhost", 3301)).unwrap();
let samples = harness_iter_async(|| async {
client.call("test_stored_proc", &(1, 2)).await.unwrap();
});
print_stats("network_client", samples);
mod iproto_clients {
use super::{harness_iter, harness_iter_async, print_stats};
use tarantool::test::util::listen_port;
use tarantool::{
fiber,
net_box::{Conn, ConnOptions, Options},
network::client::{AsClient as _, Client},
proc,
};
#[proc]
fn bench_network_client() {
let client = fiber::block_on(Client::connect("localhost", listen_port())).unwrap();
let samples = harness_iter_async(|| async {
client.call("test_stored_proc", &(1, 2)).await.unwrap();
});
print_stats("network_client", samples);
}
#[proc]
fn bench_netbox() {
let conn = Conn::new(
("localhost", listen_port()),
ConnOptions {
..ConnOptions::default()
},
None,
)
.unwrap();
conn.wait_connected(None).unwrap();
let samples = harness_iter(|| {
conn.call("test_stored_proc", &(1, 2), &Options::default())
.unwrap();
});
print_stats("netbox", samples);
}
}
#[proc]
fn bench_netbox() {
let conn = Conn::new(
("localhost", 3301),
ConnOptions {
..ConnOptions::default()
},
mod msgpack_serialization {
use super::{harness_iter, print_stats};
use serde::{Deserialize, Serialize};
use tarantool::msgpack::*;
use tarantool::proc;
const HEIGHT: usize = 5;
const DEGREE: usize = 4;
#[derive(Encode, Decode, Serialize, Deserialize)]
enum Foo {
Bar(usize),
Baz(usize),
None,
)
.unwrap();
conn.wait_connected(None).unwrap();
let samples = harness_iter(|| {
conn.call("test_stored_proc", &(1, 2), &Options::default())
.unwrap();
});
print_stats("netbox", samples);
}
#[derive(Encode, Decode, Serialize, Deserialize)]
struct Node {
s: String,
n: usize,
e: Foo,
leaves: Vec<Node>,
}
fn gen_tree(height: usize, degree: usize) -> Node {
let mut node = Node {
leaves: vec![],
s: format!("height is {}", height),
n: height * degree,
e: Foo::Bar(height),
};
if height == 0 {
return node;
}
for _ in 0..degree {
// Recursion should be ok for testing purposes
node.leaves.push(gen_tree(height - 1, degree));
}
node
}
#[proc]
fn bench_custom_encode() {
let tree = gen_tree(HEIGHT, DEGREE);
let samples = harness_iter(|| {
let _bytes = encode(&tree);
});
print_stats("custom_encode", samples);
}
#[proc]
fn bench_custom_decode() {
let tree = gen_tree(HEIGHT, DEGREE);
let bytes = encode(&tree);
let samples = harness_iter(|| {
let _got_tree: Node = decode(&bytes).unwrap();
});
print_stats("custom_decode", samples);
}
#[proc]
fn bench_serde_encode() {
let tree = gen_tree(HEIGHT, DEGREE);
let samples = harness_iter(|| {
let _bytes = rmp_serde::to_vec(&tree).unwrap();
});
print_stats("serde_encode", samples);
}
#[proc]
fn bench_serde_decode() {
let tree = gen_tree(HEIGHT, DEGREE);
let bytes = rmp_serde::to_vec(&tree).unwrap();
let samples = harness_iter(|| {
let _got_tree: Node = rmp_serde::from_slice(&bytes).unwrap();
});
print_stats("serde_decode", samples);
}
}
#[proc]
......@@ -48,22 +134,36 @@ fn l_print_stats(fn_name: &str, samples: Vec<i64>) {
print_stats(fn_name, samples.iter().map(|v| *v as u128).collect())
}
#[allow(clippy::unit_arg)]
fn harness_iter(mut f: impl FnMut()) -> Vec<u128> {
let mut samples = vec![];
// Preheating
for _ in 0..PREHEAT_ITERS {
std::hint::black_box(f());
}
let mut samples = Vec::with_capacity(N_ITERS);
for _ in 0..N_ITERS {
let start = Instant::now();
f();
std::hint::black_box(f());
samples.push(start.elapsed().as_nanos());
}
samples
}
#[allow(clippy::unit_arg)]
fn harness_iter_async<F: Future>(mut f: impl FnMut() -> F) -> Vec<u128> {
let mut samples = vec![];
// Preheating
fiber::block_on(async {
for _ in 0..PREHEAT_ITERS {
std::hint::black_box(f().await);
}
});
let mut samples = Vec::with_capacity(N_ITERS);
fiber::block_on(async {
for _ in 0..N_ITERS {
let start = Instant::now();
f().await;
std::hint::black_box(f().await);
samples.push(start.elapsed().as_nanos());
}
});
......
......@@ -4,7 +4,7 @@ authors = [
]
name = "tarantool-proc"
description = "Tarantool proc macros"
version = "1.0.0"
version = "3.0.0"
edition = "2021"
license = "BSD-2-Clause"
documentation = "https://docs.rs/tarantool-proc/"
......
This diff is collapsed.
......@@ -9,7 +9,8 @@ pub fn impl_macro_attribute(attr: TS1, item: TS1) -> TS1 {
let ctx = Context::from_args(args);
let fn_name = &fn_item.sig.ident;
let test_name = fn_name.to_string();
let test_name_ident = syn::Ident::new(&test_name.to_uppercase(), fn_name.span());
let unique_name = format!("TARANTOOL_MODULE_TEST_CASE_{}", test_name.to_uppercase());
let test_name_ident = syn::Ident::new(&unique_name, fn_name.span());
let Context {
tarantool,
section,
......
[package]
name = "tarantool"
description = "Tarantool rust bindings"
version = "3.0.2"
version = "5.0.0"
authors = [
"Dmitriy Koltsov <dkoltsov@picodata.io>",
"Georgy Moshkin <gmoshkin@picodata.io>",
......@@ -20,48 +20,51 @@ rust-version = "1.67.1"
[dependencies]
base64 = "0.13"
bitflags = "1.2"
byteorder = "1.3"
clap = {version = "3", features = ["derive", "env"]}
dlopen = "0.1.8"
dec = "0.4.8"
thiserror = "1.0.30"
libc = "0.2"
libc = { version = "0.2", features = ["extra_traits"] }
log = "0.4"
nix = "0.24.1"
num-traits = "0.2"
num-derive = "0.3"
once_cell = "1.4.0"
tlua = { path = "../tlua", version = "2.0.0" }
tlua = { path = "../tlua", version = "3.0.0" }
refpool = { version = "0.4.3", optional = true }
rmp = "=0.8.11"
rmp-serde = "=1.0.0"
rmp-serde = "=1.1.0"
rmpv = { version = "=1.0.0", features = ["with-serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_bytes = "^0"
sha-1 = "0.9"
tarantool-proc = { path = "../tarantool-proc", version = "1.0.0" }
tarantool-proc = { path = "../tarantool-proc", version = "3.0" }
uuid = "0.8.2"
futures = "0.3.25"
linkme = "0.2.10"
async-trait = "0.1.64"
tester = { version = "0.7.0", optional = true }
time = ">=0.3.0, <0.3.18"
crossbeam-queue = "0.3.8"
async-std = { version = "1.12.0", optional = true, default_features = false, features = ["std"] }
crossbeam-queue = { version = "0.3.8", optional = true }
async-std = { version = "1.12.0", optional = true, default_features = false, features = [
"std",
] }
pretty_assertions = { version = "1.4", optional = true }
[target.'cfg(not(all(target_arch = "aarch64", target_os = "macos")))'.dependencies]
va_list = "0.1.3"
tempfile = { version = "3.9", optional = true }
va_list = ">=0.1.4"
tokio = { version = "=1.29.1", features = [
"sync",
"rt",
"time",
], optional = true }
[features]
default = ["net_box", "network_client"]
net_box = ["refpool"]
picodata = []
picodata = ["crossbeam-queue"]
tokio_components = ["picodata", "tokio"]
network_client = []
test = ["tester"]
all = ["default", "test"]
internal_test = ["test", "tlua/test", "pretty_assertions"]
internal_test = ["test", "tlua/internal_test", "pretty_assertions", "tempfile"]
[dev-dependencies]
time-macros = "=0.2.6"
pretty_assertions = "1.4"
#![cfg(feature = "picodata")]
use std::ffi::CString;
use crate::error;
use crate::ffi::tarantool as ffi;
use crate::space::SpaceId;
/// This is a direct translation of `box_privilege_type` enum from `user_def.h`
#[repr(u16)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum PrivType {
/// SELECT
Read = 1,
/// INSERT, UPDATE, UPSERT, DELETE, REPLACE
Write = 2,
/// CALL
Execute = 4,
/// SESSION
Session = 8,
/// USAGE
Usage = 16,
/// CREATE
Create = 32,
/// DROP
Drop = 64,
/// ALTER
Alter = 128,
/// REFERENCE - required by ANSI - not implemented
Reference = 256,
/// TRIGGER - required by ANSI - not implemented
Trigger = 512,
/// INSERT - required by ANSI - not implemented
Insert = 1024,
/// UPDATE - required by ANSI - not implemented
Update = 2048,
/// DELETE - required by ANSI - not implemented
Delete = 4096,
/// This is never granted, but used internally.
Grant = 8192,
/// Never granted, but used internally.
Revoke = 16384,
All = u16::MAX,
}
/// This function is a wrapper around similarly named one in tarantool.
/// It allows to run access check for the current user against
/// specified space and access type. Most relevant access types are read and write.
pub fn box_access_check_space(space_id: SpaceId, user_access: PrivType) -> crate::Result<()> {
let ret = unsafe { ffi::box_access_check_space(space_id, user_access as u16) };
if ret == -1 {
Err(error::Error::Tarantool(error::TarantoolError::last()))
} else {
Ok(())
}
}
/// This is a direct translation of `box_schema_object_type` enum from `schema_def.h`
#[repr(u32)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Default)]
pub enum SchemaObjectType {
#[default]
Unknown = 0,
Universe = 1,
Space = 2,
Function = 3,
User = 4,
Role = 5,
Sequence = 6,
Collation = 7,
ObjectTypeMax = 8,
EntitySpace = 9,
EntityFunction = 10,
EntityUser = 11,
EntityRole = 12,
EntitySequence = 13,
EntityCollation = 14,
EntityTypeMax = 15,
}
impl SchemaObjectType {
fn is_entity(&self) -> bool {
*self as u32 > SchemaObjectType::ObjectTypeMax as u32
}
}
/// This function allows to perform various permission checks externally.
/// Note that there are no checks performed for harmless combinations
/// it doesnt make sense, i e execute space. This shouldnt lead to any
/// critical issues like UB but is just pointless from the application perspective.
///
/// # Panicking
///
/// Note that not all combinations of parameters are valid.
///
/// For example Entity* object types can only be used with [`PrivType::Grant`]
/// or [`PrivType::Revoke`].
/// Otherwise because of how this is structured inside tarantool such a call
/// leads to undefined behavior.
///
/// Another such example is that when using Grant or Revoke owner id must be set
/// to current user because in this context the owner is the user who grants
/// the permission (grantor). This works because for Grant or Revoke
/// box_access_check_ddl is not enough. For proper permission check you need
/// to additionally perform checks contained in priv_def_check C function.
///
/// So given these limitations box_access_check_ddl guards against
/// invalid combinations that lead to UB by panicking instead.
pub fn box_access_check_ddl(
object_name: &str,
object_id: u32,
owner_id: u32,
object_type: SchemaObjectType,
access: PrivType,
) -> crate::Result<()> {
assert!(
!object_type.is_entity() || matches!(access, PrivType::Grant | PrivType::Revoke),
"Entity scoped permissons can be checked only with Grant or Revoke"
);
if matches!(access, PrivType::Grant | PrivType::Revoke) {
assert_eq!(
owner_id,
crate::session::uid().expect("there must be current user"),
"This is incorrect use of the API. For grant and revoke owner_id must be current user (grantor)."
)
}
let name = CString::new(object_name).expect("object name may not contain interior null bytes");
let ret = unsafe {
ffi::box_access_check_ddl(
name.as_ptr(),
object_id,
owner_id,
object_type as u32,
access as u16,
)
};
if ret == -1 {
Err(error::Error::Tarantool(error::TarantoolError::last()))
} else {
Ok(())
}
}
......@@ -11,7 +11,7 @@ crate::define_str_enum! {
#[cfg(feature = "picodata")]
crate::define_str_enum! {
#[derive(Default, clap::ArgEnum)]
#[derive(Default)]
pub enum AuthMethod {
#[default]
ChapSha1 = "chap-sha1",
......
......@@ -50,12 +50,14 @@
//! For implementing a consumer lock and unlock a [`crate::fiber::Cond`] is used.
pub mod oneshot;
pub mod sync;
pub mod unbounded;
use crate::ffi;
use crate::ffi::tarantool::{
cbus_endpoint_delete, cbus_endpoint_new, cbus_loop, lcpipe_delete, lcpipe_new, lcpipe_push_now,
};
use crate::fiber::Cond;
use std::ffi::CString;
use std::os::raw::c_void;
use std::ptr;
......@@ -66,6 +68,10 @@ pub enum RecvError {
Disconnected,
}
#[derive(Debug, thiserror::Error)]
#[error("receiving half of a channel is disconnected")]
pub struct SendError<T>(pub T);
#[derive(Debug, thiserror::Error)]
pub enum CbusError {
#[error("endpoint with given name already registered")]
......@@ -96,7 +102,7 @@ pub struct Message<T> {
impl<F> Message<F>
where
F: FnOnce() + 'static,
F: FnOnce() + Send + 'static,
{
unsafe fn trampoline(msg: *mut c_void) {
let msg = msg.cast::<Self>();
......@@ -175,10 +181,9 @@ pub struct LCPipe {
pipe: *mut ffi::tarantool::LCPipe,
}
// It is safe to send lcpipe to not-owning thread.
unsafe impl Send for LCPipe {}
unsafe impl Sync for LCPipe {}
impl LCPipe {
/// Create and initialize a pipe and connect it to the consumer.
/// The call returns only when the consumer, identified by endpoint name, has joined the bus.
......@@ -192,7 +197,7 @@ impl LCPipe {
}
/// Push a new message into pipe. Message will be flushed to consumer queue (but not handled) immediately.
pub fn push_message<T>(&self, msg: Message<T>) {
pub fn push_message<T>(&mut self, msg: Message<T>) {
let msg = Box::new(msg);
// leaks a message, there is no `Box::from_raw` later, because it will happen implicitly
// when [`MessageHop::f`] called
......@@ -207,22 +212,45 @@ impl Drop for LCPipe {
}
}
/// This is a wrapper over a [`Cond`] for sending it between threads.
///
/// # Safety.
/// `UnsafeCond` must be dereferenced and dropped only in the cord thread, dropping this structure
/// in thread that not created an underline `Cond` it will lead to an application crash.
struct UnsafeCond(Cond);
impl UnsafeCond {
/// Return a reference to underline `Cond`.
///
/// # Safety.
/// It is safe to use a `Cond` only in a tarantool cord threads (TX thread in most cases).
unsafe fn as_ref(&self) -> &Cond {
&self.0
}
}
unsafe impl Send for UnsafeCond {}
unsafe impl Sync for UnsafeCond {}
#[cfg(feature = "internal_test")]
mod tests {
use crate::cbus;
use crate::cbus::Message;
use crate::fiber::{Cond, Fiber};
use crate::fiber;
use crate::fiber::Cond;
use std::thread;
use std::thread::ThreadId;
pub(super) fn run_cbus_endpoint(endpoint_name: &str) -> Fiber<'static, ()> {
let mut fiber = Fiber::new("cbus_fiber", &mut |_: Box<()>| {
let cbus_endpoint = cbus::Endpoint::new(endpoint_name).unwrap();
cbus_endpoint.cbus_loop();
0
});
fiber.start(());
fiber
pub(super) fn run_cbus_endpoint(endpoint_name: &'static str) -> fiber::FiberId {
fiber::Builder::new()
.name("cbus_fiber")
.func(move || {
let cbus_endpoint = cbus::Endpoint::new(endpoint_name).unwrap();
cbus_endpoint.cbus_loop();
})
.start_non_joinable()
.expect("failed to start the cbus_fiber")
}
#[crate::test(tarantool = "crate")]
......@@ -230,7 +258,7 @@ mod tests {
static mut TX_THREAD_ID: Option<ThreadId> = None;
static mut SENDER_THREAD_ID: Option<ThreadId> = None;
let mut cbus_fiber = run_cbus_endpoint("cbus_send_message_test");
let cbus_fiber_id = run_cbus_endpoint("cbus_send_message_test");
struct CondPtr(*const Cond);
unsafe impl Send for CondPtr {}
......@@ -240,7 +268,7 @@ mod tests {
let thread = thread::spawn(move || {
unsafe { SENDER_THREAD_ID = Some(thread::current().id()) };
let pipe = cbus::LCPipe::new("cbus_send_message_test");
let mut pipe = cbus::LCPipe::new("cbus_send_message_test");
let msg = Message::new(move || {
unsafe { TX_THREAD_ID = Some(thread::current().id()) };
let cond = unsafe { cond_ptr.0.as_ref().unwrap() };
......@@ -258,6 +286,6 @@ mod tests {
}
thread.join().unwrap();
cbus_fiber.cancel();
assert!(fiber::cancel(cbus_fiber_id));
}
}
use super::{LCPipe, Message};
use super::{LCPipe, Message, UnsafeCond};
use crate::cbus::RecvError;
use crate::fiber::Cond;
use std::cell::UnsafeCell;
use std::cell::{RefCell, UnsafeCell};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::sync::{Arc, Mutex, Weak};
/// A oneshot channel based on tarantool cbus. This a channel between any arbitrary thread and a cord.
/// Cord - a thread with `libev` event loop inside (typically tx thread).
......@@ -12,7 +12,7 @@ struct Channel<T> {
/// Condition variable for synchronize consumer (cord) and producer,
/// using an [`Arc`] instead of raw pointer cause there is a situation
/// when channel dropped before cbus endpoint receive a cond
cond: Arc<Cond>,
cond: Arc<UnsafeCond>,
/// Atomic flag, signaled that sender already have a data for receiver
ready: AtomicBool,
}
......@@ -27,7 +27,7 @@ impl<T> Channel<T> {
Self {
message: UnsafeCell::new(None),
ready: AtomicBool::new(false),
cond: Arc::new(Cond::new()),
cond: Arc::new(UnsafeCond(Cond::new())),
}
}
}
......@@ -39,65 +39,57 @@ impl<T> Channel<T> {
/// It is safe to drop sender when [`EndpointReceiver::receive`] is not calling.
pub struct Sender<T> {
channel: Weak<Channel<T>>,
pipe: Arc<LCPipe>,
pipe: RefCell<LCPipe>,
/// This mutex used for create a critical that guards an invariant - when sender upgrade
/// `Weak<Channel<T>>` reference there is two `Arc<Channel<T>>` in the same moment of time (in
/// this case `Cond` in `Channel<T>` always dropped at receiver side) or
/// `Weak<Channel<T>>::upgrade` returns `None`. Compliance with this invariant guarantees that
/// the `Cond` always dropped at receiver (TX thread) side.
arc_guard: Arc<Mutex<()>>,
}
unsafe impl<T> Send for Sender<T> {}
unsafe impl<T> Sync for Sender<T> {}
/// Receiver part of oneshot channel. Must be used in cord context.
pub struct EndpointReceiver<T> {
channel: Arc<Channel<T>>,
channel: Option<Arc<Channel<T>>>,
arc_guard: Arc<Mutex<()>>,
}
/// Creates a new oneshot channel, returning the sender/receiver halves with already created [`LCPipe`] instance.
/// This method is useful if you want to avoid any memory allocations.
/// Typically better use a [`channel`] method that create a new lcpipe instance,
/// lcpipe is pretty small structure so overhead is not big.
/// Creates a new oneshot channel, returning the sender/receiver halves. Please note that the receiver should only be used inside the cord.
///
/// # Arguments
///
/// * `pipe`: lcpipe - a cbus communication channel
/// * `cbus_endpoint`: cbus endpoint name. Note that the tx thread (or any other cord)
/// must have a fiber occupied by the endpoint cbus_loop.
///
/// # Examples
///
/// ```no_run
/// #[cfg(feature = "picodata")] {
/// use std::sync::Arc;
/// use tarantool::cbus::oneshot;
/// use tarantool::cbus::LCPipe;
///
/// let pipe = LCPipe::new("some_endpoint");
/// let (sender, receiver) = oneshot::channel_on_pipe::<u8>(Arc::new(pipe));
/// let (sender, receiver) = oneshot::channel::<u8>("some_endpoint");
/// }
/// ```
pub fn channel_on_pipe<T>(pipe: Arc<LCPipe>) -> (Sender<T>, EndpointReceiver<T>) {
pub fn channel<T>(cbus_endpoint: &str) -> (Sender<T>, EndpointReceiver<T>) {
let channel = Arc::new(Channel::new());
let arc_guard = Arc::new(Mutex::default());
(
Sender {
channel: Arc::downgrade(&channel),
pipe,
pipe: RefCell::new(LCPipe::new(cbus_endpoint)),
arc_guard: Arc::clone(&arc_guard),
},
EndpointReceiver {
channel: Some(channel),
arc_guard,
},
EndpointReceiver { channel },
)
}
/// Creates a new oneshot channel, returning the sender/receiver halves. Please note that the receiver should only be used inside the cord.
///
/// # Arguments
///
/// * `cbus_endpoint`: cbus endpoint name. Note that the tx thread (or any other cord)
/// must have a fiber occupied by the endpoint cbus_loop.
///
/// # Examples
///
/// ```no_run
/// #[cfg(feature = "picodata")] {
/// use tarantool::cbus::oneshot;
/// let (sender, receiver) = oneshot::channel::<u8>("some_endpoint");
/// }
/// ```
pub fn channel<T>(cbus_endpoint: &str) -> (Sender<T>, EndpointReceiver<T>) {
channel_on_pipe(Arc::new(LCPipe::new(cbus_endpoint)))
}
impl<T> Sender<T> {
/// Attempts to send a value on this channel.
///
......@@ -105,6 +97,10 @@ impl<T> Sender<T> {
///
/// * `message`: message to send
pub fn send(self, message: T) {
// We assume that this lock has a minimal impact on performance, in most of situations
// lock of mutex will take the fast path.
let _crit_sect = self.arc_guard.lock().unwrap();
if let Some(chan) = self.channel.upgrade() {
unsafe { *chan.message.get() = Some(message) };
chan.ready.store(true, Ordering::Release);
......@@ -117,6 +113,10 @@ impl<T> Sender<T> {
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
// We assume that this lock has a minimal impact on performance, in most of situations
// lock of mutex will take the fast path.
let _crit_sect = self.arc_guard.lock().unwrap();
let mb_chan = self.channel.upgrade();
let mb_cond = mb_chan.map(|chan| chan.cond.clone());
// at this point we are sure that there is at most one reference to a [`Channel`] - in receiver side,
......@@ -127,9 +127,11 @@ impl<T> Drop for Sender<T> {
// sender drop, because `cond` moved in callback argument of [`cbus::Message`] and decrement
// when message is handling
let msg = Message::new(move || {
cond.signal();
// SAFETY: it is ok to call as_ref() here because this callback will be invoked
// on the thread that created the channel with this cond
unsafe { (*cond).as_ref().signal() };
});
self.pipe.push_message(msg);
self.pipe.borrow_mut().push_message(msg);
}
}
}
......@@ -138,14 +140,23 @@ impl<T> EndpointReceiver<T> {
/// Attempts to wait for a value on this receiver, returns a [`RecvError`]
/// if the corresponding channel has hung up (sender was dropped).
pub fn receive(self) -> Result<T, RecvError> {
if !self.channel.ready.swap(false, Ordering::Acquire) {
let channel = self
.channel
.as_ref()
.expect("unreachable: channel must exists");
if !channel.ready.swap(false, Ordering::Acquire) {
// assume that situation when [`crate::fiber::Cond::signal()`] called before
// [`crate::fiber::Cond::wait()`] and after swap `ready` to false is never been happen,
// cause signal and wait both calling in tx thread (or any other cord) and there is now yields between it
self.channel.cond.wait();
// SAFETY: it is ok to call wait() here because we're on original thread that created the cond
unsafe {
(*channel.cond).as_ref().wait();
}
}
unsafe {
self.channel
channel
.message
.get()
.as_mut()
......@@ -156,6 +167,13 @@ impl<T> EndpointReceiver<T> {
}
}
impl<T> Drop for EndpointReceiver<T> {
fn drop(&mut self) {
let _crit_sect = self.arc_guard.lock().unwrap();
drop(self.channel.take());
}
}
impl<T> Default for Channel<T> {
fn default() -> Self {
Self::new()
......@@ -165,16 +183,15 @@ impl<T> Default for Channel<T> {
#[cfg(feature = "internal_test")]
mod tests {
use super::super::tests::run_cbus_endpoint;
use crate::cbus;
use crate::cbus::{oneshot, RecvError};
use crate::fiber;
use crate::fiber::{check_yield, YieldResult};
use std::sync::Arc;
use std::time::Duration;
use std::{mem, thread};
#[crate::test(tarantool = "crate")]
pub fn oneshot_test() {
let mut cbus_fiber = run_cbus_endpoint("oneshot_test");
let cbus_fiber_id = run_cbus_endpoint("oneshot_test");
let (sender, receiver) = oneshot::channel("oneshot_test");
let thread = thread::spawn(move || {
......@@ -199,18 +216,15 @@ mod tests {
YieldResult::DidntYield(2)
);
cbus_fiber.cancel();
assert!(fiber::cancel(cbus_fiber_id));
}
#[crate::test(tarantool = "crate")]
pub fn oneshot_multiple_channels_test() {
let mut cbus_fiber = run_cbus_endpoint("oneshot_multiple_channels_test");
let pipe = cbus::LCPipe::new("oneshot_multiple_channels_test");
let pipe = Arc::new(pipe);
let cbus_fiber_id = run_cbus_endpoint("oneshot_multiple_channels_test");
let (sender1, receiver1) = oneshot::channel_on_pipe(Arc::clone(&pipe));
let (sender2, receiver2) = oneshot::channel_on_pipe(Arc::clone(&pipe));
let (sender1, receiver1) = oneshot::channel("oneshot_multiple_channels_test");
let (sender2, receiver2) = oneshot::channel("oneshot_multiple_channels_test");
let thread1 = thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
......@@ -230,12 +244,12 @@ mod tests {
thread1.join().unwrap();
thread2.join().unwrap();
cbus_fiber.cancel();
assert!(fiber::cancel(cbus_fiber_id));
}
#[crate::test(tarantool = "crate")]
pub fn oneshot_sender_drop_test() {
let mut cbus_fiber = run_cbus_endpoint("oneshot_sender_drop_test");
let cbus_fiber_id = run_cbus_endpoint("oneshot_sender_drop_test");
let (sender, receiver) = oneshot::channel::<()>("oneshot_sender_drop_test");
......@@ -248,6 +262,6 @@ mod tests {
assert!(matches!(result, Err(RecvError::Disconnected)));
thread.join().unwrap();
cbus_fiber.cancel();
assert!(fiber::cancel(cbus_fiber_id));
}
}
#![cfg(any(feature = "picodata", doc))]
/// A synchronous channels for popular runtimes.
/// Synchronous channel - means that channel has internal buffer with user-defined capacity.
/// Synchronous channel differs against of unbounded channel in the semantics of the sender: if
/// channel buffer is full then all sends called from producer will block a runtime, until channel
/// buffer is freed.
///
/// It is important to use a channel that suits the runtime in which the producer works.
/// A channels for messaging between an OS thread (producer) and tarantool cord (consumer).
pub mod std;
/// A channels for messaging between a tokio task (producer) and tarantool cord (consumer).
pub mod tokio;
use crate::cbus::{LCPipe, RecvError, SendError};
use crate::fiber::Cond;
use std::cell::RefCell;
use std::num::NonZeroUsize;
use std::sync;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Condvar as StdCondvar;
use std::sync::{Arc, Mutex, Weak};
type CordWaker = crate::cbus::unbounded::Waker;
/// A synchronization component between producers (an OS thread) and a consumer (a cord).
/// The responsibility of this component is to wake up a producer when it's blocked because
/// channel internal buffer is full.
struct ThreadWaker {
lock: Mutex<bool>,
cond: StdCondvar,
}
impl ThreadWaker {
fn new() -> Self {
Self {
lock: Mutex::new(false),
cond: StdCondvar::new(),
}
}
/// Lock until waker is woken up.
/// In context of sync-channels, return from this function mean that there's some free
/// space in message buffer, or receiver is disconnected.
fn wait(&self, disconnected: &AtomicBool) {
let mut started = self
.lock
.lock()
.expect("unexpected panic in consumer thread");
if disconnected.load(Ordering::Acquire) {
return;
}
while !*started {
started = self
.cond
.wait(started)
.expect("unexpected panic in consumer thread");
}
}
/// Send wakeup signal to a single [`ThreadWaker::wait`] caller.
fn wakeup_one(&self) {
let mut started = self
.lock
.lock()
.expect("unexpected panic in producer thread");
*started = true;
self.cond.notify_one();
}
/// Send wakeup signal to all [`ThreadWaker::wait`] callers.
fn wakeup_all(&self) {
let mut started = self
.lock
.lock()
.expect("unexpected panic in producer thread");
*started = true;
self.cond.notify_all();
}
}
/// A synchronous mpsc channel based on tarantool cbus.
struct Channel<T> {
list: crossbeam_queue::ArrayQueue<T>,
disconnected: AtomicBool,
cbus_endpoint: String,
}
impl<T> Channel<T> {
/// Create a new channel.
///
/// # Arguments
///
/// * `cbus_endpoint`: cbus endpoint name.
/// * `cap`: specifies the buffer size.
fn new(cbus_endpoint: &str, cap: NonZeroUsize) -> Self {
Self {
list: crossbeam_queue::ArrayQueue::new(cap.into()),
disconnected: AtomicBool::new(false),
cbus_endpoint: cbus_endpoint.to_string(),
}
}
}
/// Creates a new synchronous channel, returning the sender/receiver halves.
/// Please note that the receiver should only be used inside the cord.
///
/// Like asynchronous [`channel`]s, the [`EndpointReceiver`] will block until a message becomes
/// available. Synchronous channel differs greatly in the semantics of the sender, however.
///
/// This channel has an internal buffer on which messages will be queued.
/// `cap` specifies the buffer size. When the internal buffer becomes full,
/// future sends will *block* waiting for the buffer to open up.
///
/// # Arguments
///
/// * `cbus_endpoint`: cbus endpoint name. Note that the tx thread (or any other cord)
/// must have a fiber occupied by the endpoint cbus_loop.
/// * `cap`: specifies the buffer size.
///
/// # Examples
///
/// ```no_run
/// #[cfg(feature = "picodata")] {
/// use tarantool::cbus::sync::std::channel;
/// use std::num::NonZeroUsize;
/// let (sender, receiver) = channel::<u8>("some_endpoint", NonZeroUsize::new(100).unwrap());
/// }
/// ```
pub fn channel<T>(cbus_endpoint: &str, cap: NonZeroUsize) -> (Sender<T>, EndpointReceiver<T>) {
let chan = Arc::new(Channel::new(cbus_endpoint, cap));
let waker = Arc::new(CordWaker::new(Cond::new()));
let arc_guard = Arc::new(sync::Mutex::default());
let thread_waker = Arc::new(ThreadWaker::new());
let s = Sender {
inner: Arc::new(SenderInner {
chan: Arc::clone(&chan),
}),
cord_waker: Arc::downgrade(&waker),
thread_waker: Arc::clone(&thread_waker),
lcpipe: RefCell::new(LCPipe::new(&chan.cbus_endpoint)),
arc_guard: Arc::clone(&arc_guard),
};
let r = EndpointReceiver {
chan: Arc::clone(&chan),
cord_waker: Some(Arc::clone(&waker)),
thread_waker: Arc::clone(&thread_waker),
arc_guard,
};
(s, r)
}
struct SenderInner<T> {
chan: Arc<Channel<T>>,
}
unsafe impl<T> Send for SenderInner<T> {}
impl<T> Drop for SenderInner<T> {
fn drop(&mut self) {
self.chan.disconnected.store(true, Ordering::Release);
}
}
/// A sending-half of a channel. Can be used in OS thread context (because `send` may block tarantool
/// or tokio runtime).
/// Messages can be sent through this channel with [`Sender::send`].
/// Clone the sender if you need one more producer.
pub struct Sender<T> {
/// a "singleton" part of sender, drop of this part means that all sender's are dropped and
/// receiver must return [`RecvError::Disconnected`] on `recv`
inner: Arc<SenderInner<T>>,
/// synchronize receiver and producers (send wakeup messages from producer to receiver),
/// using weak ref here cause drop `Waker` outside of cord thread lead to segfault
cord_waker: Weak<CordWaker>,
/// synchronize receiver and producers (send wakeup messages from receiver to producer)
thread_waker: Arc<ThreadWaker>,
/// an LCPipe instance, unique for each sender
lcpipe: RefCell<LCPipe>,
/// This mutex used for create a critical that guards an invariant - when sender upgrade
/// `Weak<Waker>` reference there is two `Arc<Waker>` in the same moment of time (in this case
/// `Waker` always dropped at receiver side) or `Weak<Waker>::upgrade` returns `None`. Compliance
/// with this invariant guarantees that the `Cond` always dropped at receiver (TX thread) side.
arc_guard: Arc<sync::Mutex<()>>,
}
unsafe impl<T> Send for Sender<T> {}
unsafe impl<T> Sync for Sender<T> {}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
// We assume that this lock has a minimal impact on performance, in most of situations
// lock of mutex will take the fast path.
let _crit_section = self.arc_guard.lock().unwrap();
if let Some(waker) = self.cord_waker.upgrade() {
waker.wakeup(&mut self.lcpipe.borrow_mut());
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
cord_waker: self.cord_waker.clone(),
thread_waker: self.thread_waker.clone(),
lcpipe: RefCell::new(LCPipe::new(&self.inner.chan.cbus_endpoint)),
arc_guard: self.arc_guard.clone(),
}
}
}
impl<T> Sender<T> {
/// Attempts to send a value on this channel, returning it back if it could
/// not be sent (in case when receiver half is closed). If channel buffer is full then
/// current thread sleep until it's freed.
///
/// Note that a return value of [`Err`] means that the data will never be
/// received, but a return value of [`Ok`] does *not* mean that the data
/// will be received. It is possible for the corresponding receiver to
/// hang up immediately after this function returns [`Ok`].
///
/// # Arguments
///
/// * `message`: message to send
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
let mut msg = msg;
// We assume that this lock has a minimal impact on performance, in most of situations
// lock of mutex will take the fast path.
let _crit_section = self.arc_guard.lock().unwrap();
// wake up a sleeping receiver
if let Some(waker) = self.cord_waker.upgrade() {
loop {
let push_result = self.inner.chan.list.push(msg);
if let Err(not_accepted_msg) = push_result {
self.thread_waker.wait(&self.inner.chan.disconnected);
if self.inner.chan.disconnected.load(Ordering::Acquire) {
return Err(SendError(not_accepted_msg));
}
msg = not_accepted_msg;
} else {
break;
}
}
waker.wakeup(&mut self.lcpipe.borrow_mut());
Ok(())
} else {
Err(SendError(msg))
}
}
}
/// Receiver part of synchronous channel. Must be used in cord context.
pub struct EndpointReceiver<T> {
chan: Arc<Channel<T>>,
cord_waker: Option<Arc<CordWaker>>,
thread_waker: Arc<ThreadWaker>,
arc_guard: Arc<Mutex<()>>,
}
// The receiver part can be sent from place to place, so long as it
// is not used to receive non-sendable things.
unsafe impl<T> Send for EndpointReceiver<T> {}
impl<T> Drop for EndpointReceiver<T> {
fn drop(&mut self) {
self.chan.disconnected.store(true, Ordering::Release);
self.thread_waker.wakeup_all();
let _crit_section = self.arc_guard.lock().unwrap();
drop(self.cord_waker.take());
}
}
impl<T> EndpointReceiver<T> {
/// Attempts to wait for a value on this receiver, returns a [`RecvError::Disconnected`]
/// when all of producers are dropped.
pub fn receive(&self) -> Result<T, RecvError> {
loop {
if let Some(msg) = self.chan.list.pop() {
self.thread_waker.wakeup_one();
return Ok(msg);
}
if self.chan.disconnected.load(Ordering::Acquire) {
return Err(RecvError::Disconnected);
}
self.cord_waker
.as_ref()
.expect("unreachable: waker must exists")
.wait();
}
}
/// Return message count in receiver buffer.
pub fn len(&self) -> usize {
self.chan.list.len()
}
/// Return true if receiver message buffer is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(feature = "internal_test")]
#[allow(clippy::redundant_pattern_matching)]
mod tests {
use crate::cbus::sync;
use crate::cbus::tests::run_cbus_endpoint;
use crate::cbus::RecvError;
use crate::fiber;
use crate::fiber::{check_yield, YieldResult};
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
#[crate::test(tarantool = "crate")]
pub fn single_producer() {
let cbus_fiber_id = run_cbus_endpoint("std_single_producer");
let cap = NonZeroUsize::new(10).unwrap();
let (tx, rx) = sync::std::channel("std_single_producer", cap);
let thread = thread::spawn(move || {
for i in 0..1000 {
_ = tx.send(i);
if i % 100 == 0 {
thread::sleep(Duration::from_millis(100));
}
}
});
assert_eq!(
check_yield(|| {
let mut recv_results = vec![];
for _ in 0..1000 {
recv_results.push(rx.receive().unwrap());
}
recv_results
}),
YieldResult::Yielded((0..1000).collect::<Vec<_>>())
);
thread.join().unwrap();
assert!(fiber::cancel(cbus_fiber_id));
}
#[crate::test(tarantool = "crate")]
pub fn single_producer_lock() {
let cbus_fiber_id = run_cbus_endpoint("std_single_producer_lock");
static SEND_COUNTER: AtomicU64 = AtomicU64::new(0);
let cap = NonZeroUsize::new(10).unwrap();
let (tx, rx) = sync::std::channel("std_single_producer_lock", cap);
let thread = thread::spawn(move || {
for i in 0..100 {
_ = tx.send(i);
SEND_COUNTER.fetch_add(1, Ordering::SeqCst);
}
});
fiber::sleep(Duration::from_millis(100));
let mut recv_results = vec![];
for i in 0..10 {
// assert that sender write 10 messages on each iteration and sleep
assert_eq!(SEND_COUNTER.load(Ordering::SeqCst), (i + 1) * 10);
for _ in 0..10 {
recv_results.push(rx.receive().unwrap());
}
fiber::sleep(Duration::from_millis(100));
}
assert_eq!((0..100).collect::<Vec<_>>(), recv_results);
thread.join().unwrap();
assert!(fiber::cancel(cbus_fiber_id));
}
#[crate::test(tarantool = "crate")]
pub fn drop_rx_before_tx() {
// This test check that there is no memory corruption if sender part of channel drops after
// receiver part. Previously, when the receiver was drop after sender, [`Fiber::Cond`] release outside the tx thread
// and segfault is occurred.
let cbus_fiber_id = run_cbus_endpoint("std_drop_rx_before_tx");
let cap = NonZeroUsize::new(1000).unwrap();
let (tx, rx) = sync::std::channel("std_drop_rx_before_tx", cap);
let thread = thread::spawn(move || {
for i in 1..300 {
_ = tx.send(i);
if i % 100 == 0 {
thread::sleep(Duration::from_secs(1));
}
}
});
fiber::sleep(Duration::from_secs(1));
drop(rx);
thread.join().unwrap();
assert!(fiber::cancel(cbus_fiber_id));
}
#[crate::test(tarantool = "crate")]
pub fn tx_disconnect() {
let cbus_fiber_id = run_cbus_endpoint("std_tx_disconnect");
let cap = NonZeroUsize::new(1).unwrap();
let (tx, rx) = sync::std::channel("std_tx_disconnect", cap);
let thread = thread::spawn(move || {
_ = tx.send(1);
_ = tx.send(2);
});
assert!(matches!(rx.receive(), Ok(1)));
assert!(matches!(rx.receive(), Ok(2)));
assert!(matches!(rx.receive(), Err(RecvError::Disconnected)));
thread.join().unwrap();
assert!(fiber::cancel(cbus_fiber_id));
}
#[crate::test(tarantool = "crate")]
pub fn rx_disconnect() {
let cbus_fiber_id = run_cbus_endpoint("std_rx_disconnect");
let cap = NonZeroUsize::new(1).unwrap();
let (tx, rx) = sync::std::channel("std_rx_disconnect", cap);
let thread = thread::spawn(move || {
assert!(tx.send(1).is_ok());
thread::sleep(Duration::from_millis(100));
// at this point receiver must be dropped and send return an error
assert!(tx.send(2).is_err());
});
assert!(matches!(rx.receive(), Ok(1)));
drop(rx);
thread.join().unwrap();
assert!(fiber::cancel(cbus_fiber_id));
}
#[crate::test(tarantool = "crate")]
pub fn multiple_producer() {
const MESSAGES_PER_PRODUCER: i32 = 10_000;
let cbus_fiber_id = run_cbus_endpoint("std_multiple_producer");
let cap = NonZeroUsize::new(10).unwrap();
let (tx, rx) = sync::std::channel("std_multiple_producer", cap);
fn create_producer(sender: sync::std::Sender<i32>) -> JoinHandle<()> {
thread::spawn(move || {
for i in 0..MESSAGES_PER_PRODUCER {
_ = sender.send(i);
}
})
}
let jh1 = create_producer(tx.clone());
let jh2 = create_producer(tx.clone());
let jh3 = create_producer(tx);
for _ in 0..MESSAGES_PER_PRODUCER * 3 {
assert!(matches!(rx.receive(), Ok(_)));
}
assert!(matches!(rx.receive(), Err(RecvError::Disconnected)));
jh1.join().unwrap();
jh2.join().unwrap();
jh3.join().unwrap();
assert!(fiber::cancel(cbus_fiber_id));
}
#[crate::test(tarantool = "crate")]
pub fn multiple_producer_lock() {
const MESSAGES_PER_PRODUCER: i32 = 100;
let cbus_fiber_id = run_cbus_endpoint("std_multiple_producer_lock");
let cap = NonZeroUsize::new(10).unwrap();
let (tx, rx) = sync::std::channel("std_multiple_producer_lock", cap);
static SEND_COUNTER: AtomicU64 = AtomicU64::new(0);
fn create_producer(sender: sync::std::Sender<i32>) -> JoinHandle<()> {
thread::spawn(move || {
for i in 0..MESSAGES_PER_PRODUCER {
_ = sender.send(i);
SEND_COUNTER.fetch_add(1, Ordering::SeqCst);
}
})
}
let jh1 = create_producer(tx.clone());
let jh2 = create_producer(tx.clone());
let jh3 = create_producer(tx);
fiber::sleep(Duration::from_millis(100));
for i in 0..10 * 3 {
// assert that all threads produce 10 messages and sleep after
assert_eq!(SEND_COUNTER.load(Ordering::SeqCst), (i + 1) * 10);
for _ in 0..10 {
assert!(matches!(rx.receive(), Ok(_)));
}
fiber::sleep(Duration::from_millis(100));
}
assert!(matches!(rx.receive(), Err(RecvError::Disconnected)));
jh1.join().unwrap();
jh2.join().unwrap();
jh3.join().unwrap();
assert!(fiber::cancel(cbus_fiber_id));
}
}