From 08cac50b895c1b70b240b582def116f35c48a01d Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Mon, 21 Nov 2022 12:25:39 +0700
Subject: [PATCH] feat: implement LuaRead and PushInto for IR plan

These traits are required to serialize IR to Lua and deserialize
it back to Rust while dispatching an execution plan to the storages.
---
 sbroad-core/src/executor.rs                   |  14 +-
 sbroad-core/src/executor/bucket.rs            |   2 +-
 sbroad-core/src/executor/tests.rs             | 170 ++++++++++++++++--
 .../src/executor/tests/empty_motion.rs        |  20 ++-
 sbroad-core/src/executor/tests/not_eq.rs      |   2 +-
 sbroad-core/src/executor/tests/subtree.rs     |  10 +-
 sbroad-core/src/executor/vtable.rs            |  72 ++++++--
 sbroad-core/src/executor/vtable/tests.rs      |   2 +-
 sbroad-core/src/ir.rs                         |  97 +++++++++-
 sbroad-core/src/ir/distribution.rs            |  80 +++++++--
 sbroad-core/src/ir/distribution/tests.rs      |  70 +++-----
 sbroad-core/src/ir/expression.rs              |   6 +-
 sbroad-core/src/ir/expression/cast.rs         |   3 +-
 sbroad-core/src/ir/operator.rs                |  17 +-
 sbroad-core/src/ir/operator/tests.rs          |   5 +-
 sbroad-core/src/ir/relation.rs                |  10 +-
 .../src/ir/transformation/redistribution.rs   |  25 ++-
 .../ir/transformation/redistribution/tests.rs | 152 +++-------------
 .../redistribution/tests/between.rs           |   5 +-
 .../redistribution/tests/except.rs            |   3 +-
 .../redistribution/tests/not_in.rs            |   3 +-
 21 files changed, 498 insertions(+), 270 deletions(-)

diff --git a/sbroad-core/src/executor.rs b/sbroad-core/src/executor.rs
index c3e8abd518..0d8513a0cf 100644
--- a/sbroad-core/src/executor.rs
+++ b/sbroad-core/src/executor.rs
@@ -23,7 +23,6 @@
 //! 5. Repeats step 3 till we are done with motion layers.
 //! 6. Executes the final IR top subtree and returns the final result to the user.
 
-use ahash::RandomState;
 use std::any::Any;
 use std::collections::{hash_map::Entry, HashMap};
 use std::rc::Rc;
@@ -157,18 +156,18 @@ where
         }
 
         let slices = self.exec_plan.get_ir_plan().clone_slices();
-        if let Some(slices) = slices {
+        if let Some(slices) = slices.slices() {
             for slice in slices {
-                for motion_id in slice {
+                for motion_id in slice.positions() {
                     // TODO: make it work in parallel
-                    let top_id = self.exec_plan.get_motion_subtree_root(motion_id)?;
+                    let top_id = self.exec_plan.get_motion_subtree_root(*motion_id)?;
                     let buckets = self.bucket_discovery(top_id)?;
                     let virtual_table = self.coordinator.materialize_motion(
                         &mut self.exec_plan,
-                        motion_id,
+                        *motion_id,
                         &buckets,
                     )?;
-                    self.add_motion_result(motion_id, virtual_table)?;
+                    self.add_motion_result(*motion_id, virtual_table)?;
                 }
             }
         }
@@ -234,8 +233,7 @@ where
     ) -> Result<(), QueryPlannerError> {
         vtable.set_motion_key(sharding_key);
 
-        let mut index: HashMap<u64, Vec<usize>, RandomState> =
-            HashMap::with_hasher(RandomState::new());
+        let mut index: HashMap<u64, Vec<usize>> = HashMap::new();
         for (pos, tuple) in vtable.get_tuples().iter().enumerate() {
             let mut shard_key_tuple: Vec<&Value> = Vec::new();
             for target in &sharding_key.targets {
diff --git a/sbroad-core/src/executor/bucket.rs b/sbroad-core/src/executor/bucket.rs
index beee43af95..827692fa1d 100644
--- a/sbroad-core/src/executor/bucket.rs
+++ b/sbroad-core/src/executor/bucket.rs
@@ -123,7 +123,7 @@ where
 
                     // The right side is a regular row with constants
                     // on the positions of the left keys (if we are lucky).
-                    for key in keys {
+                    for key in keys.iter() {
                         let mut values: Vec<&Value> = Vec::new();
                         for position in &key.positions {
                             let right_column_id =
diff --git a/sbroad-core/src/executor/tests.rs b/sbroad-core/src/executor/tests.rs
index 8b3dcd0784..9485d66c31 100644
--- a/sbroad-core/src/executor/tests.rs
+++ b/sbroad-core/src/executor/tests.rs
@@ -131,7 +131,15 @@ fn linker_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
     let mut virtual_table = virtual_table_23();
     if let MotionPolicy::Segment(key) = get_motion_policy(query.exec_plan.get_ir_plan(), motion_id)
     {
@@ -210,7 +218,15 @@ fn union_linker_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
     let mut virtual_table = virtual_table_23();
     if let MotionPolicy::Segment(key) = get_motion_policy(query.exec_plan.get_ir_plan(), motion_id)
     {
@@ -308,7 +324,15 @@ WHERE "t3"."id" = 2 AND "t8"."identification_number" = 2"#;
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
     let mut virtual_table = virtual_table_23();
     virtual_table.set_alias("\"t8\"").unwrap();
     if let MotionPolicy::Segment(key) = get_motion_policy(query.exec_plan.get_ir_plan(), motion_id)
@@ -378,7 +402,15 @@ fn join_linker2_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
 
     let mut virtual_table = VirtualTable::new();
     virtual_table.add_column(Column {
@@ -445,7 +477,15 @@ fn join_linker3_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
 
     let mut virtual_table = VirtualTable::new();
     virtual_table.add_column(Column {
@@ -517,7 +557,15 @@ fn join_linker4_test() {
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
 
-    let motion_t2_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_t2_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
     let mut virtual_t2 = VirtualTable::new();
     virtual_t2.add_column(Column {
         name: "r_id".into(),
@@ -538,7 +586,15 @@ fn join_linker4_test() {
         .coordinator
         .add_virtual_table(motion_t2_id, virtual_t2);
 
-    let motion_sq_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][1];
+    let motion_sq_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(1)
+        .unwrap()
+        .clone();
     let mut virtual_sq = VirtualTable::new();
     virtual_sq.add_column(Column {
         name: "fn".into(),
@@ -619,7 +675,15 @@ fn anonymous_col_index_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion1_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion1_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
     let mut virtual_t1 = virtual_table_23();
     if let MotionPolicy::Segment(key) = get_motion_policy(query.exec_plan.get_ir_plan(), motion1_id)
     {
@@ -630,7 +694,15 @@ fn anonymous_col_index_test() {
     query
         .coordinator
         .add_virtual_table(motion1_id, virtual_table_23());
-    let motion2_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][1];
+    let motion2_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(1)
+        .unwrap()
+        .clone();
     let mut virtual_t2 = virtual_table_23();
     if let MotionPolicy::Segment(key) = get_motion_policy(query.exec_plan.get_ir_plan(), motion2_id)
     {
@@ -768,7 +840,15 @@ fn insert1_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
 
     let mut virtual_table = VirtualTable::new();
     virtual_table.add_column(Column {
@@ -838,7 +918,15 @@ fn insert2_test() {
     // the target table, we still add a motion and collect a
     // virtual table for it on coordinator to recalculate
     // a "bucket_id" field for "t".
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
     let mut virtual_table = VirtualTable::new();
     virtual_table.add_column(Column {
         name: "a".into(),
@@ -893,7 +981,15 @@ fn insert3_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
 
     let mut virtual_table = VirtualTable::new();
     virtual_table.add_column(Column {
@@ -973,7 +1069,15 @@ fn insert4_test() {
 
     // Though data allows to be inserted locally still gather it on the
     // coordinator to recalculate a "bucket_id" field for "t".
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
     let mut virtual_table = VirtualTable::new();
     virtual_table.add_column(Column {
         name: "b".into(),
@@ -1028,7 +1132,15 @@ fn insert5_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
 
     let mut virtual_table = VirtualTable::new();
     virtual_table.add_column(Column {
@@ -1092,7 +1204,15 @@ fn insert6_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
 
     let mut virtual_table = VirtualTable::new();
     virtual_table.add_column(Column {
@@ -1198,7 +1318,15 @@ fn insert8_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
 
     let mut virtual_table = VirtualTable::new();
     virtual_table.add_column(Column::new(
@@ -1270,7 +1398,15 @@ fn insert9_test() {
         vec![Value::from(1_u64), Value::from(2_u64)],
     )
     .unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
 
     let mut virtual_table = VirtualTable::new();
     virtual_table.add_column(Column {
diff --git a/sbroad-core/src/executor/tests/empty_motion.rs b/sbroad-core/src/executor/tests/empty_motion.rs
index ab51d06c34..08d774f11c 100644
--- a/sbroad-core/src/executor/tests/empty_motion.rs
+++ b/sbroad-core/src/executor/tests/empty_motion.rs
@@ -23,7 +23,15 @@ fn empty_motion1_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion1_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion1_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
     let mut virtual_t1 = t2_empty();
     if let MotionPolicy::Segment(key) = get_motion_policy(query.exec_plan.get_ir_plan(), motion1_id)
     {
@@ -32,7 +40,15 @@ fn empty_motion1_test() {
             .unwrap();
     }
     query.coordinator.add_virtual_table(motion1_id, virtual_t1);
-    let motion2_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][1];
+    let motion2_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(1)
+        .unwrap()
+        .clone();
     let mut virtual_t2 = t2_empty();
     if let MotionPolicy::Segment(key) = get_motion_policy(query.exec_plan.get_ir_plan(), motion2_id)
     {
diff --git a/sbroad-core/src/executor/tests/not_eq.rs b/sbroad-core/src/executor/tests/not_eq.rs
index a2f7ab4565..d687d25bff 100644
--- a/sbroad-core/src/executor/tests/not_eq.rs
+++ b/sbroad-core/src/executor/tests/not_eq.rs
@@ -24,7 +24,7 @@ fn not_eq1_test() {
     let plan = query.exec_plan.get_ir_plan();
 
     // Validate the motion type.
-    assert_eq!(true, plan.clone_slices().is_none());
+    assert_eq!(true, plan.clone_slices().slices().is_none());
 
     // Execute the query.
     let result = *query
diff --git a/sbroad-core/src/executor/tests/subtree.rs b/sbroad-core/src/executor/tests/subtree.rs
index a2986bd408..926a52e7b5 100644
--- a/sbroad-core/src/executor/tests/subtree.rs
+++ b/sbroad-core/src/executor/tests/subtree.rs
@@ -14,7 +14,15 @@ fn exec_plan_subtree_test() {
     let coordinator = RouterRuntimeMock::new();
 
     let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
-    let motion_id = query.exec_plan.get_ir_plan().clone_slices().unwrap()[0][0];
+    let motion_id = query
+        .exec_plan
+        .get_ir_plan()
+        .clone_slices()
+        .slice(0)
+        .unwrap()
+        .position(0)
+        .unwrap()
+        .clone();
     let mut virtual_table = virtual_table_23();
     if let MotionPolicy::Segment(key) = get_motion_policy(query.exec_plan.get_ir_plan(), motion_id)
     {
diff --git a/sbroad-core/src/executor/vtable.rs b/sbroad-core/src/executor/vtable.rs
index 5d75a5a841..788e9a4872 100644
--- a/sbroad-core/src/executor/vtable.rs
+++ b/sbroad-core/src/executor/vtable.rs
@@ -1,24 +1,66 @@
-use ahash::RandomState;
-
 use std::collections::{HashMap, HashSet};
+use std::num::NonZeroI32;
 use std::vec;
 
 use serde::{Deserialize, Serialize};
+use tarantool::tlua::{self, AsLua, LuaRead};
 
 use crate::errors::QueryPlannerError;
 use crate::ir::relation::Column;
 use crate::ir::transformation::redistribution::{MotionKey, Target};
 use crate::ir::value::Value;
 
-pub type VTableTuple = Vec<Value>;
 type ShardingKey = Vec<Value>;
+pub type VTableTuple = Vec<Value>;
+#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+pub struct VTableIndex {
+    value: HashMap<u64, Vec<usize>>,
+}
+
+impl VTableIndex {
+    fn new() -> Self {
+        Self {
+            value: HashMap::new(),
+        }
+    }
+}
+
+impl From<HashMap<u64, Vec<usize>>> for VTableIndex {
+    fn from(value: HashMap<u64, Vec<usize>>) -> Self {
+        Self { value }
+    }
+}
+
+impl<L> tlua::LuaRead<L> for VTableIndex
+where
+    L: tlua::AsLua,
+{
+    fn lua_read_at_position(lua: L, index: NonZeroI32) -> Result<VTableIndex, L> {
+        match HashMap::lua_read_at_position(lua, index) {
+            Ok(map) => Ok(VTableIndex::from(map)),
+            Err(lua) => Err(lua),
+        }
+    }
+}
+
+impl<L> tlua::PushInto<L> for VTableIndex
+where
+    L: AsLua,
+{
+    type Err = tlua::Void;
+    fn push_into_lua(self, lua: L) -> Result<tlua::PushGuard<L>, (tlua::Void, L)> {
+        Ok(tlua::push_userdata(self.value, lua, |_| {}))
+    }
+}
+
+impl<L> tlua::PushOneInto<L> for VTableIndex where L: tlua::AsLua {}
 
 #[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
 struct ShardingRecord(ShardingKey, usize);
 
 /// Result tuple storage, created by the executor. All tuples
 /// have a distribution key.
-#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+#[derive(LuaRead, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub struct VirtualTable {
     /// List of the columns.
     columns: Vec<Column>,
@@ -31,7 +73,17 @@ pub struct VirtualTable {
     /// Index groups tuples by the buckets:
     /// the key is a bucket id, the value is a list of positions
     /// in the `tuples` list corresponding to the bucket.
-    index: HashMap<u64, Vec<usize>, RandomState>,
+    index: VTableIndex,
+}
+
+impl<L> tlua::PushInto<L> for VirtualTable
+where
+    L: AsLua,
+{
+    type Err = tlua::Void;
+    fn push_into_lua(self, lua: L) -> Result<tlua::PushGuard<L>, (tlua::Void, L)> {
+        Ok(tlua::push_userdata(self, lua, |_| {}))
+    }
 }
 
 impl Default for VirtualTable {
@@ -48,7 +100,7 @@ impl VirtualTable {
             tuples: vec![],
             name: None,
             distribution_key: None,
-            index: HashMap::with_hasher(RandomState::new()),
+            index: VTableIndex::new(),
         }
     }
 
@@ -102,13 +154,13 @@ impl VirtualTable {
 
     /// Gets virtual table index
     #[must_use]
-    pub fn get_index(&self) -> &HashMap<u64, Vec<usize>, RandomState> {
-        &self.index
+    pub fn get_index(&self) -> &HashMap<u64, Vec<usize>> {
+        &self.index.value
     }
 
     /// Set vtable index
-    pub fn set_index(&mut self, index: HashMap<u64, Vec<usize>, RandomState>) {
-        self.index = index;
+    pub fn set_index(&mut self, index: HashMap<u64, Vec<usize>>) {
+        self.index = index.into();
     }
 
     /// Get virtual table tuples' values, participating in the distribution key.
diff --git a/sbroad-core/src/executor/vtable/tests.rs b/sbroad-core/src/executor/vtable/tests.rs
index 4c78ff0ccf..3e8e918bc6 100644
--- a/sbroad-core/src/executor/vtable/tests.rs
+++ b/sbroad-core/src/executor/vtable/tests.rs
@@ -24,7 +24,7 @@ fn virtual_table() {
         tuples: vec![vec![Value::from(1_u64)]],
         name: Some(String::from("test")),
         distribution_key: None,
-        index: HashMap::with_hasher(RandomState::new()),
+        index: VTableIndex::new(),
     };
 
     assert_eq!(expected, vtable);
diff --git a/sbroad-core/src/ir.rs b/sbroad-core/src/ir.rs
index 361f80a8df..8505873966 100644
--- a/sbroad-core/src/ir.rs
+++ b/sbroad-core/src/ir.rs
@@ -2,9 +2,10 @@
 //!
 //! Contains the logical plan tree and helpers.
 
-use std::collections::{HashMap, HashSet};
-
 use serde::{Deserialize, Serialize};
+use std::collections::{HashMap, HashSet};
+use std::num::NonZeroI32;
+use tarantool::tlua::{self, AsLua, LuaRead, PushInto};
 
 use expression::Expression;
 use operator::Relational;
@@ -66,6 +67,36 @@ impl TransformationLog {
     }
 }
 
+impl<L> tlua::LuaRead<L> for TransformationLog
+where
+    L: tlua::AsLua,
+{
+    fn lua_read_at_position(lua: L, index: NonZeroI32) -> Result<TransformationLog, L> {
+        match HashMap::lua_read_at_position(lua, index) {
+            Ok(map) => {
+                let mut log = TransformationLog::new();
+                for (new_id, old_id) in map {
+                    log.add(old_id, new_id);
+                }
+                Ok(log)
+            }
+            Err(lua) => Err(lua),
+        }
+    }
+}
+
+impl<L> tlua::PushInto<L> for TransformationLog
+where
+    L: AsLua,
+{
+    type Err = tlua::Void;
+    fn push_into_lua(self, lua: L) -> Result<tlua::PushGuard<L>, (tlua::Void, L)> {
+        Ok(tlua::push_userdata(self.0, lua, |_| {}))
+    }
+}
+
+impl<L> tlua::PushOneInto<L> for TransformationLog where L: tlua::AsLua {}
+
 /// Plan tree node.
 ///
 /// There are two kinds of node variants: expressions and relational
@@ -78,7 +109,7 @@ impl TransformationLog {
 ///
 /// Enum was chosen as we don't want to mess with dynamic
 /// dispatching and its performance penalties.
-#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub enum Node {
     Expression(Expression),
     Relational(Relational),
@@ -86,7 +117,7 @@ pub enum Node {
 }
 
 /// Plan nodes storage.
-#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub struct Nodes {
     /// We don't want to mess with the borrow checker and RefCell/Rc,
     /// so all nodes are stored in the single arena ("nodes" array).
@@ -131,8 +162,56 @@ impl Nodes {
     }
 }
 
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+pub struct Slice(Vec<usize>);
+
+impl From<Vec<usize>> for Slice {
+    fn from(vec: Vec<usize>) -> Self {
+        Self(vec)
+    }
+}
+
+impl Slice {
+    #[must_use]
+    pub fn position(&self, index: usize) -> Option<&usize> {
+        self.0.get(index)
+    }
+
+    #[must_use]
+    pub fn positions(&self) -> &[usize] {
+        &self.0
+    }
+}
+
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+pub struct Slices(Option<Vec<Slice>>);
+
+impl From<Option<Vec<Slice>>> for Slices {
+    fn from(vec: Option<Vec<Slice>>) -> Self {
+        Self(vec)
+    }
+}
+
+impl From<Option<Vec<Vec<usize>>>> for Slices {
+    fn from(vec: Option<Vec<Vec<usize>>>) -> Self {
+        Self(vec.map(|vec| vec.into_iter().map(Slice::from).collect()))
+    }
+}
+
+impl Slices {
+    #[must_use]
+    pub fn slice(&self, index: usize) -> Option<&Slice> {
+        self.0.as_ref().and_then(|slices| slices.get(index))
+    }
+
+    #[must_use]
+    pub fn slices(&self) -> Option<&Vec<Slice>> {
+        self.0.as_ref()
+    }
+}
+
 /// Logical plan tree structure.
-#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub struct Plan {
     /// Append only arena for the plan nodes.
     pub(crate) nodes: Nodes,
@@ -146,7 +225,7 @@ pub struct Plan {
     /// Motions level by level in a bottom-up manner to the "slices" array
     /// of arrays. All the slices on the same level can be executed in parallel.
     /// In fact, "slices" is a prepared set of commands for the executor.
-    slices: Option<Vec<Vec<usize>>>,
+    slices: Slices,
     /// The plan top is marked as optional for tree creation convenience.
     /// We build the plan tree in a bottom-up manner, so the top would
     /// be added last. The plan without a top should be treated as invalid.
@@ -208,7 +287,7 @@ impl Plan {
         Plan {
             nodes: Nodes { arena: Vec::new() },
             relations: None,
-            slices: None,
+            slices: Slices(None),
             top: None,
             is_explain: false,
             undo: TransformationLog::new(),
@@ -255,7 +334,7 @@ impl Plan {
 
     /// Clone plan slices.
     #[must_use]
-    pub fn clone_slices(&self) -> Option<Vec<Vec<usize>>> {
+    pub fn clone_slices(&self) -> Slices {
         self.slices.clone()
     }
 
@@ -543,7 +622,7 @@ impl Plan {
 
     /// Set slices of the plan.
     pub fn set_slices(&mut self, slices: Option<Vec<Vec<usize>>>) {
-        self.slices = slices;
+        self.slices = slices.into();
     }
 }
 
diff --git a/sbroad-core/src/ir/distribution.rs b/sbroad-core/src/ir/distribution.rs
index c1eeb142e9..f4de2ffa30 100644
--- a/sbroad-core/src/ir/distribution.rs
+++ b/sbroad-core/src/ir/distribution.rs
@@ -2,8 +2,10 @@
 
 use ahash::RandomState;
 use std::collections::{HashMap, HashSet};
+use std::num::NonZeroI32;
 
 use serde::{Deserialize, Serialize};
+use tarantool::tlua::{self, AsLua, LuaRead, PushGuard, PushInto, Void};
 
 use crate::collection;
 use crate::errors::QueryPlannerError;
@@ -18,7 +20,7 @@ use super::{Node, Plan};
 /// If table T1 is segmented by columns (a, b) and produces
 /// tuples with columns (a, b, c), it means that for any T1 tuple
 /// on a segment S1: f(a, b) = S1 and (a, b) is a segmentation key.
-#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug, Clone)]
+#[derive(LuaRead, PushInto, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, Clone)]
 pub struct Key {
     /// A list of column positions in the tuple that form a
     /// segmentation key.
@@ -32,8 +34,61 @@ impl Key {
     }
 }
 
-/// Tuple distribution in the cluster.
 #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+pub struct KeySet(HashSet<Key>);
+
+impl KeySet {
+    pub fn iter(&self) -> impl Iterator<Item = &Key> {
+        self.0.iter()
+    }
+
+    #[must_use]
+    pub fn intersection(&self, other: &Self) -> Self {
+        KeySet(self.0.intersection(&other.0).cloned().collect())
+    }
+
+    #[must_use]
+    pub fn union(&self, other: &Self) -> Self {
+        KeySet(self.0.union(&other.0).cloned().collect())
+    }
+}
+
+impl From<HashSet<Key>> for KeySet {
+    fn from(keys: HashSet<Key>) -> Self {
+        Self(keys)
+    }
+}
+
+impl<L> LuaRead<L> for KeySet
+where
+    L: AsLua,
+{
+    fn lua_read_at_position(lua: L, index: NonZeroI32) -> Result<KeySet, L> {
+        match HashMap::<Key, ()>::lua_read_at_position(lua, index) {
+            Ok(map) => {
+                let keys = map.into_iter().map(|(k, _)| k).collect();
+                Ok(KeySet(keys))
+            }
+            Err(lua) => Err(lua),
+        }
+    }
+}
+
+impl<L> PushInto<L> for KeySet
+where
+    L: AsLua,
+{
+    type Err = Void;
+    fn push_into_lua(self, lua: L) -> Result<PushGuard<L>, (Void, L)> {
+        let map: HashMap<Key, ()> = self.0.into_iter().map(|k| (k, ())).collect();
+        Ok(tlua::push_userdata(map, lua, |_| {}))
+    }
+}
+
+impl<L> tlua::PushOneInto<L> for KeySet where L: AsLua {}
+
+/// Tuple distribution in the cluster.
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub enum Distribution {
     /// A tuple can be located on any data node.
     /// Example: projection removes the segment key columns.
@@ -45,7 +100,7 @@ pub enum Distribution {
     /// Example: tuples from the segmented table.
     Segment {
         /// A set of distribution keys (we can have multiple keys after join)
-        keys: HashSet<Key>,
+        keys: KeySet,
     },
 }
 
@@ -66,13 +121,13 @@ impl Distribution {
                 },
             ) => {
                 let mut keys: HashSet<Key> = HashSet::new();
-                for key in keys_left.intersection(keys_right) {
+                for key in keys_left.intersection(keys_right).iter() {
                     keys.insert(Key::new(key.positions.clone()));
                 }
                 if keys.is_empty() {
                     Distribution::Any
                 } else {
-                    Distribution::Segment { keys }
+                    Distribution::Segment { keys: keys.into() }
                 }
             }
         }
@@ -101,13 +156,13 @@ impl Distribution {
                 },
             ) => {
                 let mut keys: HashSet<Key> = HashSet::new();
-                for key in keys_left.union(keys_right) {
+                for key in keys_left.union(keys_right).iter() {
                     keys.insert(Key::new(key.positions.clone()));
                 }
                 if keys.is_empty() {
                     Distribution::Any
                 } else {
-                    Distribution::Segment { keys }
+                    Distribution::Segment { keys: keys.into() }
                 }
             }
         }
@@ -362,7 +417,7 @@ impl Plan {
                     Some(Distribution::Replicated) => return Ok(Distribution::Replicated),
                     Some(Distribution::Segment { keys }) => {
                         let mut new_keys: HashSet<Key> = HashSet::new();
-                        for key in keys {
+                        for key in keys.iter() {
                             let mut new_key: Key = Key::new(Vec::new());
                             let all_found = key.positions.iter().all(|pos| {
                                 child_pos_map
@@ -381,7 +436,9 @@ impl Plan {
                         if new_keys.is_empty() {
                             return Ok(Distribution::Any);
                         }
-                        return Ok(Distribution::Segment { keys: new_keys });
+                        return Ok(Distribution::Segment {
+                            keys: new_keys.into(),
+                        });
                     }
                 }
             }
@@ -432,9 +489,8 @@ impl Plan {
                     ..
                 } = self.get_mut_expression_node(row_id)?
                 {
-                    *distribution = Some(Distribution::Segment {
-                        keys: collection! { new_key },
-                    });
+                    let keys: HashSet<_> = collection! { new_key };
+                    *distribution = Some(Distribution::Segment { keys: keys.into() });
                 }
             }
             return Ok(());
diff --git a/sbroad-core/src/ir/distribution/tests.rs b/sbroad-core/src/ir/distribution/tests.rs
index e77fc46e6a..07a201c594 100644
--- a/sbroad-core/src/ir/distribution/tests.rs
+++ b/sbroad-core/src/ir/distribution/tests.rs
@@ -34,10 +34,9 @@ fn proj_preserve_dist_key() {
     };
     plan.set_distribution(scan_output).unwrap();
     if let Node::Expression(scan_row) = plan.get_node(scan_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![1, 0]) };
         assert_eq!(
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![1, 0]) }
-            },
+            &Distribution::Segment { keys: keys.into() },
             scan_row.distribution().unwrap()
         );
     }
@@ -49,10 +48,9 @@ fn proj_preserve_dist_key() {
     };
     plan.set_distribution(proj_output).unwrap();
     if let Node::Expression(proj_row) = plan.get_node(proj_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![1, 0]) };
         assert_eq!(
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![1, 0]) }
-            },
+            &Distribution::Segment { keys: keys.into() },
             proj_row.distribution().unwrap()
         );
     }
@@ -76,20 +74,18 @@ fn proj_shuffle_dist_key() {
 
     plan.set_distribution(scan_output).unwrap();
     if let Node::Expression(scan_row) = plan.get_node(scan_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![1, 0]) };
         assert_eq!(
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![1, 0]) }
-            },
+            &Distribution::Segment { keys: keys.into() },
             scan_row.distribution().unwrap()
         );
     }
 
     plan.set_distribution(proj_output).unwrap();
     if let Node::Expression(proj_row) = plan.get_node(proj_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![0, 1]) };
         assert_eq!(
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![0, 1]) }
-            },
+            &Distribution::Segment { keys: keys.into() },
             proj_row.distribution().unwrap()
         );
     }
@@ -112,10 +108,9 @@ fn proj_shrink_dist_key_1() {
     let proj_output = 14;
 
     plan.set_distribution(scan_output).unwrap();
+    let keys: HashSet<_> = collection! { Key::new(vec![1, 0]) };
     assert_eq!(
-        &Distribution::Segment {
-            keys: collection! { Key::new(vec![1, 0]) }
-        },
+        &Distribution::Segment { keys: keys.into() },
         plan.get_distribution(scan_output).unwrap()
     );
 
@@ -143,10 +138,9 @@ fn proj_shrink_dist_key_2() {
     let proj_output = 12;
 
     plan.set_distribution(scan_output).unwrap();
+    let keys: HashSet<_> = collection! { Key::new(vec![1, 0]) };
     assert_eq!(
-        &Distribution::Segment {
-            keys: collection! { Key::new(vec![1, 0]) }
-        },
+        &Distribution::Segment { keys: keys.into() },
         plan.get_distribution(scan_output).unwrap()
     );
 
@@ -176,18 +170,16 @@ fn union_all_fallback_to_random() {
     let union_output = 16;
 
     plan.set_distribution(scan_t1_output).unwrap();
+    let keys: HashSet<_> = collection! { Key::new(vec![0]) };
     assert_eq!(
-        &Distribution::Segment {
-            keys: collection! { Key::new(vec![0]) }
-        },
+        &Distribution::Segment { keys: keys.into() },
         plan.get_distribution(scan_t1_output).unwrap()
     );
 
     plan.set_distribution(scan_t2_output).unwrap();
+    let keys: HashSet<_> = collection! { Key::new(vec![1]) };
     assert_eq!(
-        &Distribution::Segment {
-            keys: collection! { Key::new(vec![1]) }
-        },
+        &Distribution::Segment { keys: keys.into() },
         plan.get_distribution(scan_t2_output).unwrap()
     );
 
@@ -218,30 +210,27 @@ fn union_preserve_dist() {
 
     plan.set_distribution(scan_t1_output).unwrap();
     if let Node::Expression(scan_row) = plan.get_node(scan_t1_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![0]) };
         assert_eq!(
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![0]) }
-            },
+            &Distribution::Segment { keys: keys.into() },
             scan_row.distribution().unwrap()
         );
     }
 
     plan.set_distribution(scan_t2_output).unwrap();
     if let Node::Expression(scan_row) = plan.get_node(scan_t2_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![0]) };
         assert_eq!(
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![0]) }
-            },
+            &Distribution::Segment { keys: keys.into() },
             scan_row.distribution().unwrap()
         );
     }
 
     plan.set_distribution(union_output).unwrap();
     if let Node::Expression(scan_row) = plan.get_node(union_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![0]) };
         assert_eq!(
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![0]) }
-            },
+            &Distribution::Segment { keys: keys.into() },
             scan_row.distribution().unwrap()
         );
     }
@@ -267,30 +256,27 @@ fn join_unite_keys() {
 
     plan.set_distribution(scan_t1_output).unwrap();
     if let Node::Expression(scan_row) = plan.get_node(scan_t1_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![0]) };
         assert_eq!(
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![0]) }
-            },
+            &Distribution::Segment { keys: keys.into() },
             scan_row.distribution().unwrap()
         );
     }
 
     plan.set_distribution(scan_t2_output).unwrap();
     if let Node::Expression(scan_row) = plan.get_node(scan_t2_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![1]) };
         assert_eq!(
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![1]) }
-            },
+            &Distribution::Segment { keys: keys.into() },
             scan_row.distribution().unwrap()
         );
     }
 
     plan.set_distribution(join_output).unwrap();
     if let Node::Expression(scan_row) = plan.get_node(join_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![0]), Key::new(vec![3]) };
         assert_eq!(
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![0]), Key::new(vec![3]) }
-            },
+            &Distribution::Segment { keys: keys.into() },
             scan_row.distribution().unwrap()
         );
     }
diff --git a/sbroad-core/src/ir/expression.rs b/sbroad-core/src/ir/expression.rs
index b9cef87996..6fba4bbe06 100644
--- a/sbroad-core/src/ir/expression.rs
+++ b/sbroad-core/src/ir/expression.rs
@@ -7,9 +7,9 @@
 //! - distribution of the data in the tuple
 
 use ahash::RandomState;
-use std::collections::{HashMap, HashSet};
-
 use serde::{Deserialize, Serialize};
+use std::collections::{HashMap, HashSet};
+use tarantool::tlua::{self, LuaRead, PushInto};
 use traversal::DftPost;
 
 use crate::errors::QueryPlannerError;
@@ -33,7 +33,7 @@ pub mod concat;
 /// and should not be changed. It ensures that we always know the
 /// name of any column in the tuple and therefore simplifies AST
 /// deserialization.
-#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub enum Expression {
     /// Expression name.
     ///
diff --git a/sbroad-core/src/ir/expression/cast.rs b/sbroad-core/src/ir/expression/cast.rs
index 5c3fac2c77..6a0c2c0293 100644
--- a/sbroad-core/src/ir/expression/cast.rs
+++ b/sbroad-core/src/ir/expression/cast.rs
@@ -5,8 +5,9 @@ use crate::frontend::sql::ast::Type as AstType;
 use crate::ir::expression::Expression;
 use crate::ir::{Node, Plan};
 use serde::{Deserialize, Serialize};
+use tarantool::tlua::{self, LuaRead, PushInto};
 
-#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
 pub enum Type {
     Any,
     Boolean,
diff --git a/sbroad-core/src/ir/operator.rs b/sbroad-core/src/ir/operator.rs
index 7d5ff3c8ac..02ef2a7931 100644
--- a/sbroad-core/src/ir/operator.rs
+++ b/sbroad-core/src/ir/operator.rs
@@ -3,10 +3,10 @@
 //! Contains operator nodes that transform the tuples in IR tree.
 
 use ahash::RandomState;
-use std::collections::HashMap;
-use std::fmt::{Display, Formatter};
-
 use serde::{Deserialize, Serialize};
+use std::collections::{HashMap, HashSet};
+use std::fmt::{Display, Formatter};
+use tarantool::tlua::{self, LuaRead, PushInto};
 
 use crate::errors::QueryPlannerError;
 
@@ -14,12 +14,12 @@ use super::expression::Expression;
 use super::transformation::redistribution::{DataGeneration, MotionPolicy};
 use super::{Node, Nodes, Plan};
 use crate::collection;
-use crate::ir::distribution::Distribution;
+use crate::ir::distribution::{Distribution, KeySet};
 use crate::ir::relation::ColumnRole;
 use traversal::Bft;
 
 /// Binary operator returning Bool expression.
-#[derive(Serialize, Deserialize, PartialEq, Debug, Eq, Hash, Clone)]
+#[derive(LuaRead, PushInto, Serialize, Deserialize, PartialEq, Debug, Eq, Hash, Clone)]
 pub enum Bool {
     /// `&&`
     And,
@@ -85,7 +85,7 @@ impl Display for Bool {
 }
 
 /// Unary operator returning Bool expression.
-#[derive(Serialize, Deserialize, PartialEq, Debug, Eq, Hash, Clone)]
+#[derive(LuaRead, PushInto, Serialize, Deserialize, PartialEq, Debug, Eq, Hash, Clone)]
 pub enum Unary {
     /// `is null`
     IsNull,
@@ -125,7 +125,7 @@ impl Display for Unary {
 ///
 /// Transforms input tuple(s) into the output one using the
 /// relation algebra logic.
-#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub enum Relational {
     Except {
         /// Contains exactly two elements: left and right node indexes
@@ -614,8 +614,9 @@ impl Plan {
             let col_alias_id = self.nodes.add_alias(&col.name, r_id)?;
             refs.push(col_alias_id);
         }
+        let keys: HashSet<_> = collection! { rel.key.clone() };
         let dist = Distribution::Segment {
-            keys: collection! { rel.key.clone() },
+            keys: KeySet::from(keys),
         };
         let output = self.nodes.add_row_of_aliases(refs, Some(dist))?;
         let insert = Node::Relational(Relational::Insert {
diff --git a/sbroad-core/src/ir/operator/tests.rs b/sbroad-core/src/ir/operator/tests.rs
index df202e1e7c..52a3b2739b 100644
--- a/sbroad-core/src/ir/operator/tests.rs
+++ b/sbroad-core/src/ir/operator/tests.rs
@@ -38,11 +38,10 @@ fn scan_rel() {
 
     plan.set_distribution(scan_output).unwrap();
     if let Node::Expression(row) = plan.get_node(scan_output).unwrap() {
+        let keys: HashSet<_> = collection! { Key::new(vec![1, 0]) };
         assert_eq!(
             row.distribution().unwrap(),
-            &Distribution::Segment {
-                keys: collection! { Key::new(vec![1, 0]) }
-            }
+            &Distribution::Segment { keys: keys.into() }
         );
     } else {
         panic!("Wrong output node type!");
diff --git a/sbroad-core/src/ir/relation.rs b/sbroad-core/src/ir/relation.rs
index 4417003e60..65bc521d0b 100644
--- a/sbroad-core/src/ir/relation.rs
+++ b/sbroad-core/src/ir/relation.rs
@@ -7,7 +7,7 @@ use std::fmt::Formatter;
 use serde::de::{Error, MapAccess, Visitor};
 use serde::ser::{Serialize as SerSerialize, SerializeMap, Serializer};
 use serde::{Deserialize, Deserializer, Serialize};
-use tarantool::tlua::{self, LuaRead};
+use tarantool::tlua::{self, LuaRead, PushInto};
 
 use crate::errors::QueryPlannerError;
 use crate::ir::value::Value;
@@ -18,7 +18,7 @@ const DEFAULT_VALUE: Value = Value::Null;
 
 /// Supported column types, which is used in a schema only.
 /// This `Type` doesn't have any relation with `Type` from IR.
-#[derive(LuaRead, Serialize, Deserialize, PartialEq, Debug, Eq, Clone)]
+#[derive(LuaRead, PushInto, Serialize, Deserialize, PartialEq, Debug, Eq, Clone)]
 pub enum Type {
     Boolean,
     Decimal,
@@ -56,7 +56,7 @@ impl Type {
 }
 
 /// A role of the column in the relation.
-#[derive(PartialEq, Debug, Eq, Clone)]
+#[derive(LuaRead, PushInto, PartialEq, Debug, Eq, Clone)]
 pub enum ColumnRole {
     /// General purpose column available for the user.
     User,
@@ -65,7 +65,7 @@ pub enum ColumnRole {
 }
 
 /// Relation column.
-#[derive(PartialEq, Debug, Eq, Clone)]
+#[derive(LuaRead, PushInto, PartialEq, Debug, Eq, Clone)]
 pub struct Column {
     /// Column name.
     pub name: String,
@@ -176,7 +176,7 @@ impl Column {
 /// Table is a tuple storage in the cluster.
 ///
 /// Tables are tuple storages in the cluster.
-#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub struct Table {
     /// List of the columns.
     pub columns: Vec<Column>,
diff --git a/sbroad-core/src/ir/transformation/redistribution.rs b/sbroad-core/src/ir/transformation/redistribution.rs
index c7cd83aa5f..1f7f2de34f 100644
--- a/sbroad-core/src/ir/transformation/redistribution.rs
+++ b/sbroad-core/src/ir/transformation/redistribution.rs
@@ -1,15 +1,15 @@
 //! Resolve distribution conflicts and insert motion nodes to IR.
 
 use ahash::RandomState;
+use serde::{Deserialize, Serialize};
 use std::cmp::Ordering;
 use std::collections::{hash_map::Entry, HashMap, HashSet};
 use std::fmt::{Display, Formatter};
-
-use serde::{Deserialize, Serialize};
+use tarantool::tlua::{self, LuaRead, PushInto};
 use traversal::{Bft, DftPost};
 
 use crate::errors::QueryPlannerError;
-use crate::ir::distribution::{Distribution, Key};
+use crate::ir::distribution::{Distribution, Key, KeySet};
 use crate::ir::expression::Expression;
 use crate::ir::operator::{Bool, Relational};
 use crate::ir::relation::Column;
@@ -19,7 +19,7 @@ use crate::otm::child_span;
 use sbroad_proc::otm_child_span;
 
 /// Redistribution key targets (columns or values of the key).
-#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug, Clone)]
+#[derive(LuaRead, PushInto, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, Clone)]
 pub enum Target {
     /// A position of the existing column in the tuple.
     Reference(usize),
@@ -31,7 +31,7 @@ pub enum Target {
     Value(Value),
 }
 
-#[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Debug, Clone)]
+#[derive(LuaRead, PushInto, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, Clone)]
 pub struct MotionKey {
     pub targets: Vec<Target>,
 }
@@ -63,7 +63,7 @@ impl From<&Key> for MotionKey {
 }
 
 /// Determinate what portion of data to move between data nodes in cluster.
-#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub enum MotionPolicy {
     /// Move all data.
     Full,
@@ -74,7 +74,7 @@ pub enum MotionPolicy {
 }
 
 /// Determine what portion of data to generate during motion.
-#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
+#[derive(LuaRead, PushInto, Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
 pub enum DataGeneration {
     /// Nothing to generate.
     None,
@@ -304,7 +304,7 @@ impl Plan {
                     keys: ref keys_inner,
                 } = inner_dist
                 {
-                    if keys_outer.intersection(keys_inner).next().is_some() {
+                    if keys_outer.intersection(keys_inner).iter().next().is_some() {
                         return Ok(MotionPolicy::Local);
                     }
                 }
@@ -582,7 +582,7 @@ impl Plan {
         let right_dist = self.get_distribution(right_row_id)?;
 
         let get_policy_for_one_side_segment = |row_map: &HashMap<usize, usize>,
-                                               keys_set: &HashSet<Key>|
+                                               keys_set: &KeySet|
          -> Result<MotionPolicy, QueryPlannerError> {
             let keys = keys_set.iter().map(Clone::clone).collect::<Vec<_>>();
             let (outer_keys, _) =
@@ -910,12 +910,7 @@ impl Plan {
                             } = right_dist
                             {
                                 // Distribution key sets have common keys, no need for the data motion.
-                                if right_keys
-                                    .intersection(left_keys)
-                                    .into_iter()
-                                    .next()
-                                    .is_some()
-                                {
+                                if right_keys.intersection(left_keys).iter().next().is_some() {
                                     return Ok(map);
                                 }
                             }
diff --git a/sbroad-core/src/ir/transformation/redistribution/tests.rs b/sbroad-core/src/ir/transformation/redistribution/tests.rs
index c9941e1e03..9810f5f3cd 100644
--- a/sbroad-core/src/ir/transformation/redistribution/tests.rs
+++ b/sbroad-core/src/ir/transformation/redistribution/tests.rs
@@ -6,6 +6,7 @@ use crate::ir::operator::Relational;
 use crate::ir::relation::{Column, ColumnRole, Table, Type};
 use crate::ir::transformation::helpers::sql_to_ir;
 use crate::ir::Plan;
+use crate::ir::Slices;
 use ahash::RandomState;
 use pretty_assertions::assert_eq;
 use std::fs;
@@ -351,7 +352,7 @@ fn union_all_in_sq() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
 
 #[test]
@@ -363,7 +364,7 @@ fn inner_join_eq_for_keys() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
 
 #[test]
@@ -376,7 +377,7 @@ fn join_inner_sq_eq_for_keys() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
 
 #[test]
@@ -388,14 +389,7 @@ fn join_inner_eq_non_match_keys() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(
@@ -422,7 +416,7 @@ fn join_inner_sq_eq_for_keys_with_const() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
 
 #[test]
@@ -434,14 +428,7 @@ fn join_inner_sq_less_for_keys() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(*policy, MotionPolicy::Full);
@@ -459,14 +446,7 @@ fn join_inner_sq_eq_no_keys() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(*policy, MotionPolicy::Full);
@@ -484,14 +464,7 @@ fn join_inner_sq_eq_no_outer_keys() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(*policy, MotionPolicy::Full);
@@ -510,14 +483,7 @@ fn inner_join_full_policy_sq_in_filter() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(*policy, MotionPolicy::Full);
@@ -537,7 +503,7 @@ fn inner_join_local_policy_sq_in_filter() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
 
 #[test]
@@ -553,7 +519,7 @@ fn inner_join_local_policy_sq_with_union_all_in_filter() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
 
 #[test]
@@ -568,14 +534,7 @@ fn inner_join_full_policy_sq_with_union_all_in_filter() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(
@@ -603,7 +562,7 @@ fn join_inner_and_local_full_policies() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
 
 #[test]
@@ -616,14 +575,7 @@ fn join_inner_or_local_full_policies() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(*policy, MotionPolicy::Full);
@@ -657,14 +609,7 @@ fn join1() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(
@@ -687,12 +632,8 @@ fn join1() {
     }
     let join = join_node.unwrap();
     let dist = plan.get_distribution(join.output()).unwrap();
-    assert_eq!(
-        &Distribution::Segment {
-            keys: collection! { Key::new(vec![0]) },
-        },
-        dist,
-    );
+    let keys: HashSet<_> = collection! { Key::new(vec![0]) };
+    assert_eq!(&Distribution::Segment { keys: keys.into() }, dist,);
 }
 
 #[test]
@@ -701,14 +642,7 @@ fn redistribution1() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(
@@ -733,14 +667,7 @@ fn redistribution2() {
     plan.add_motions().unwrap();
     // Though data allows to be inserted locally still gather it on the
     // coordinator to recalculate a "bucket_id" field for "t".
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(
@@ -765,14 +692,7 @@ fn redistribution3() {
     plan.add_motions().unwrap();
     // Though data allows to be inserted locally still gather it on the
     // coordinator to recalculate a "bucket_id" field for "t".
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(
@@ -795,14 +715,7 @@ fn redistribution4() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(
@@ -825,14 +738,7 @@ fn redistribution5() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(
@@ -855,14 +761,7 @@ fn redistribution6() {
 
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
-    let motion_id = *plan
-        .slices
-        .as_ref()
-        .unwrap()
-        .first()
-        .unwrap()
-        .first()
-        .unwrap();
+    let motion_id = *plan.slices.slice(0).unwrap().position(0).unwrap();
     let motion = plan.get_relation_node(motion_id).unwrap();
     if let Relational::Motion { policy, .. } = motion {
         assert_eq!(
@@ -882,8 +781,7 @@ fn redistribution6() {
 ///   Motion node does not found
 #[must_use]
 pub fn get_motion_id(plan: &Plan, slice_id: usize, motion_idx: usize) -> Option<&usize> {
-    let slice = plan.slices.as_ref().unwrap().get(slice_id).unwrap();
-    slice.get(motion_idx)
+    plan.slices.slice(slice_id).unwrap().position(motion_idx)
 }
 
 #[cfg(test)]
diff --git a/sbroad-core/src/ir/transformation/redistribution/tests/between.rs b/sbroad-core/src/ir/transformation/redistribution/tests/between.rs
index e901cff271..d5be6cc007 100644
--- a/sbroad-core/src/ir/transformation/redistribution/tests/between.rs
+++ b/sbroad-core/src/ir/transformation/redistribution/tests/between.rs
@@ -2,6 +2,7 @@ use crate::ir::operator::Relational;
 use crate::ir::transformation::helpers::sql_to_ir;
 use crate::ir::transformation::redistribution::tests::get_motion_id;
 use crate::ir::transformation::redistribution::MotionPolicy;
+use crate::ir::Slices;
 use pretty_assertions::assert_eq;
 
 #[test]
@@ -39,7 +40,7 @@ fn between2() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
 
 #[test]
@@ -70,5 +71,5 @@ fn between4() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
diff --git a/sbroad-core/src/ir/transformation/redistribution/tests/except.rs b/sbroad-core/src/ir/transformation/redistribution/tests/except.rs
index 3c8cbb8c0e..a4028ecb27 100644
--- a/sbroad-core/src/ir/transformation/redistribution/tests/except.rs
+++ b/sbroad-core/src/ir/transformation/redistribution/tests/except.rs
@@ -2,6 +2,7 @@ use crate::ir::operator::Relational;
 use crate::ir::transformation::helpers::sql_to_ir;
 use crate::ir::transformation::redistribution::tests::get_motion_id;
 use crate::ir::transformation::redistribution::{Key, MotionPolicy};
+use crate::ir::Slices;
 use pretty_assertions::assert_eq;
 
 #[test]
@@ -30,7 +31,7 @@ fn except2() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
 
 #[test]
diff --git a/sbroad-core/src/ir/transformation/redistribution/tests/not_in.rs b/sbroad-core/src/ir/transformation/redistribution/tests/not_in.rs
index ec8ee3a79a..4823404929 100644
--- a/sbroad-core/src/ir/transformation/redistribution/tests/not_in.rs
+++ b/sbroad-core/src/ir/transformation/redistribution/tests/not_in.rs
@@ -2,6 +2,7 @@ use crate::ir::operator::Relational;
 use crate::ir::transformation::helpers::sql_to_ir;
 use crate::ir::transformation::redistribution::tests::get_motion_id;
 use crate::ir::transformation::redistribution::{Key, MotionPolicy};
+use crate::ir::Slices;
 use pretty_assertions::assert_eq;
 
 #[test]
@@ -28,7 +29,7 @@ fn not_in2() {
     let mut plan = sql_to_ir(query, vec![]);
     plan.add_motions().unwrap();
     let expected: Option<Vec<Vec<usize>>> = None;
-    assert_eq!(expected, plan.slices);
+    assert_eq!(Slices::from(expected), plan.slices);
 }
 
 #[test]
-- 
GitLab