From ae55e9f788243f2e899210970ca02d881264b45f Mon Sep 17 00:00:00 2001
From: Valentin Syrovatskiy <v.syrovatskiy@picodata.io>
Date: Mon, 26 Sep 2022 02:38:32 +0300
Subject: [PATCH] test: benchmark raft nop

---
 src/main.rs                   |   6 ++
 test/conftest.py              |   2 +-
 test/manual/test_benchmark.py | 166 +++++++++++++++++++++++++++++++++-
 3 files changed, 172 insertions(+), 2 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index 98d2018123..59d7240fda 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -73,6 +73,12 @@ fn picolib_setup(args: &args::Run) {
             traft::node::global()?.wait_for_read_state(Duration::from_secs_f64(timeout))
         }),
     );
+    luamod.set(
+        "raft_propose_nop",
+        tlua::function0(|| {
+            traft::node::global()?.propose_and_wait(traft::Op::Nop, Duration::from_secs(1))
+        }),
+    );
     luamod.set(
         "raft_propose_info",
         tlua::function1(|x: String| -> Result<(), Error> {
diff --git a/test/conftest.py b/test/conftest.py
index b4cfc6122d..9821f8aef1 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -390,7 +390,7 @@ class Instance:
         status = self._raft_status()
         assert status.is_ready
         self.raft_id = status.id
-        with self.connect(timeout=1) as conn:
+        with self.connect(timeout=2) as conn:
             self.instance_id = conn.space("raft_state").select(("instance_id",))[0][1]
         eprint(f"{self} is ready")
 
diff --git a/test/manual/test_benchmark.py b/test/manual/test_benchmark.py
index caf7174fda..dd1f068870 100644
--- a/test/manual/test_benchmark.py
+++ b/test/manual/test_benchmark.py
@@ -1,5 +1,7 @@
-from conftest import Cluster
+from itertools import count
+from conftest import Cluster, Instance, pid_alive, retrying
 from prettytable import PrettyTable  # type: ignore
+import funcy  # type: ignore
 
 
 def test_benchmark_replace(cluster: Cluster, capsys):
@@ -69,3 +71,165 @@ def test_benchmark_replace(cluster: Cluster, capsys):
     with capsys.disabled():
         print("")
         print(report(benchmark()))
+
+
+def test_benchmark_nop(binary_path, xdist_worker_number, tmpdir, capsys):
+    SIZES = [1, 2, 3, 5, 10, 20, 30, 40, 50]
+    FIBERS = [1, 5, 10, 15, 20, 25]
+
+    data_dir = tmpdir
+
+    def new_cluster():
+        n = xdist_worker_number
+        assert isinstance(n, int)
+        assert n >= 0
+
+        # Provide each worker a dedicated pool of 200 listening ports
+        base_port = 3300 + n * 200
+        max_port = base_port + 199
+        assert max_port <= 65535
+
+        cluster_ids = (f"cluster-{xdist_worker_number}-{i}" for i in count())
+
+        cluster = Cluster(
+            binary_path=binary_path,
+            id=next(cluster_ids),
+            data_dir=data_dir,
+            base_host="127.0.0.1",
+            base_port=base_port,
+            max_port=max_port,
+        )
+        return cluster
+
+    def expand_cluster(cluster: Cluster, size: int):
+        c = 0
+        while len(cluster.instances) < size:
+            cluster.add_instance(wait_ready=False).start()
+            c += 1
+            if c % 3 == 0:
+                wait_longer(cluster)
+        wait_longer(cluster)
+        return cluster
+
+    def make_cluster(size: int):
+        cluster = new_cluster()
+        expand_cluster(cluster, size)
+        return cluster
+
+    def init(i: Instance):
+        init_code = """
+        clock=require('clock')
+        fiber=require('fiber')
+        function f(n)
+            for i=1, n do
+                picolib.raft_propose_nop()
+            end
+        end
+        function benchmark_nop(n, c)
+            local fibers = {};
+            local t1 = clock.monotonic();
+            for i=1, c do
+                fibers[i] = fiber.new(f, n)
+                fibers[i]:set_joinable(true)
+            end
+            for i=1, c do
+                fibers[i]:join()
+            end
+            local t2 = clock.monotonic();
+            return {c=c, nc = n*c, time = t2-t1, rps = n*c/(t2-t1)}
+        end
+        """
+        i.eval(init_code)
+
+    def find_leader(cluster: Cluster):
+        for i in cluster.instances:
+            if is_leader(i):
+                return i
+        raise Exception("Leader not found")
+
+    def is_leader(i: Instance):
+        return state(i)["raft_state"] == "Leader"
+
+    def state(i: Instance):
+        return i.eval("return picolib.raft_status()")
+
+    def instance_to_pid(i: Instance):
+        assert i.process
+        return i.process.pid
+
+    def benchmark():
+        stats = []
+        sizes = list(set(SIZES))
+        sizes.sort()
+        for s in sizes:
+            cluster = make_cluster(s)
+            pids = list(map(instance_to_pid, cluster.instances))
+            i = find_leader(cluster)
+            init(i)
+
+            for c in FIBERS:
+                stat = [i.eval("return benchmark_nop(...)", 10, c, timeout=60)]
+                stats.append((s, c, stat))
+
+            cluster.kill()
+            cluster.remove_data()
+            retrying(lambda: assert_all_pids_down(pids))
+        return stats
+
+    def report_vertical(stats):
+        t = PrettyTable()
+        t.field_names = ["cluster size", "fibers", "rps"]
+        t.align = "r"
+        for (s, c, stat) in stats:
+            t.add_row([s, c, int(avg(rps(stat)))])
+        return t
+
+    def report_table(stats):
+        def index_exist(lis: list, i: int):
+            try:
+                lis[i]
+            except IndexError:
+                return False
+            return True
+
+        data = []
+        for (s, c, stat) in stats:
+            size_index = SIZES.index(s)
+            fiber_index = FIBERS.index(c)
+            val = int(avg(rps(stat)))
+            if not index_exist(data, size_index):
+                data.insert(size_index, [])
+            if not index_exist(data[size_index], fiber_index):
+                data[size_index].insert(fiber_index, val)
+
+        t = PrettyTable()
+        t.title = "Nop/s"
+        t.field_names = ["cluster size \\ fibers", *FIBERS]
+        t.align = "r"
+
+        index = 0
+        for d in data:
+            row = [SIZES[index], *d]
+            t.add_row(row)
+            index += 1
+
+        return t
+
+    def rps(stat):
+        return list(map(lambda s: s["rps"], stat))
+
+    def avg(x):
+        return sum(x) / len(x)
+
+    def assert_all_pids_down(pids):
+        assert all(map(lambda pid: not pid_alive(pid), pids))
+
+    with capsys.disabled():
+        stats = benchmark()
+        print(report_table(stats))
+
+
+@funcy.retry(tries=30, timeout=10)
+def wait_longer(cluster):
+    for instance in cluster.instances:
+        instance.wait_ready()
-- 
GitLab