From 77f2cc72a8c2a3322fe5469da91e6c0862b22caa Mon Sep 17 00:00:00 2001
From: Arseniy Volynets <vol0ncar@yandex.ru>
Date: Fri, 18 Aug 2023 01:31:53 +0300
Subject: [PATCH] fix: clear ir cache when schema changes

---
 sbroad             |   2 +-
 src/sql/router.rs  | 167 ++++++++++++++++++++++++++++++++++++++++-----
 src/sql/storage.rs | 120 +++++++++++++++++++++++++++++---
 3 files changed, 263 insertions(+), 26 deletions(-)

diff --git a/sbroad b/sbroad
index 8c8c0c6168..723a43d7cd 160000
--- a/sbroad
+++ b/sbroad
@@ -1 +1 @@
-Subproject commit 8c8c0c616865e94a144466097dc19cd3927da430
+Subproject commit 723a43d7cd7341e9bb4376e3a606c4522cd66b7d
diff --git a/src/sql/router.rs b/src/sql/router.rs
index 77939513c1..4200d8bd7f 100644
--- a/src/sql/router.rs
+++ b/src/sql/router.rs
@@ -4,16 +4,14 @@
 
 use sbroad::errors::{Action, Entity, SbroadError};
 use sbroad::executor::bucket::Buckets;
-use sbroad::executor::engine::helpers::sharding_keys_from_tuple;
 use sbroad::executor::engine::helpers::vshard::{
     exec_ir_on_all_buckets, exec_ir_on_some_buckets, get_random_bucket,
 };
-use sbroad::executor::engine::helpers::{
-    dispatch, explain_format, materialize_motion, sharding_keys_from_map,
-};
+use sbroad::executor::engine::helpers::{dispatch, explain_format, materialize_motion};
+use sbroad::executor::engine::helpers::{sharding_key_from_map, sharding_key_from_tuple};
 use sbroad::executor::engine::{QueryCache, Router, Vshard};
 use sbroad::executor::ir::{ConnectionType, ExecutionPlan, QueryType};
-use sbroad::executor::lru::{Cache, LRUCache, DEFAULT_CAPACITY};
+use sbroad::executor::lru::{Cache, EvictFn, LRUCache, DEFAULT_CAPACITY};
 use sbroad::executor::protocol::Binary;
 use sbroad::frontend::sql::ast::AbstractSyntaxTree;
 use sbroad::ir::value::{MsgPackValue, Value};
@@ -41,23 +39,52 @@ use sbroad::ir::relation::{space_pk_columns, Column, ColumnRole, Table, Type};
 
 use std::borrow::Cow;
 
+use crate::traft::node;
 use tarantool::space::Space;
 use tarantool::tuple::{KeyDef, Tuple};
 use tarantool::util::Value as TarantoolValue;
 
-thread_local! (
-    static PLAN_CACHE: Rc<RefCell<LRUCache<String, Plan>>> = Rc::new(
-        RefCell::new(LRUCache::new(DEFAULT_CAPACITY, None).unwrap())
-    )
-);
+pub type VersionMap = HashMap<String, u64>;
+
+thread_local! {
+    static PLAN_CACHE: Rc<RefCell<PicoRouterCache >> = Rc::new(
+        RefCell::new(PicoRouterCache::new(DEFAULT_CAPACITY).unwrap()));
+}
 
 pub const DEFAULT_BUCKET_COLUMN: &str = "bucket_id";
 
+/// Get the schema version for the given space.
+///
+/// # Arguments:
+/// * `space_name` - name of the space. The name must not
+/// be enclosed in quotes as in sql. If in sql user uses
+/// `"t"`, here `t` must be passed.
+///
+/// # Errors:
+/// - errors on access to system space
+/// - space with given name not found
+pub fn get_table_version(space_name: &str) -> Result<u64, SbroadError> {
+    let node = node::global()
+        .map_err(|e| SbroadError::FailedTo(Action::Get, None, format!("raft node: {}", e)))?;
+    let storage_spaces = &node.storage.spaces;
+    if let Some(space_def) = storage_spaces
+        .by_name(space_name)
+        .map_err(|e| SbroadError::FailedTo(Action::Get, None, format!("space_def: {}", e)))?
+    {
+        Ok(space_def.schema_version)
+    } else {
+        Err(SbroadError::NotFound(
+            Entity::SpaceMetadata,
+            format!("for space: {}", space_name),
+        ))
+    }
+}
+
 #[allow(clippy::module_name_repetitions)]
 pub struct RouterRuntime {
     metadata: RefCell<RouterMetadata>,
     bucket_count: u64,
-    ir_cache: Rc<RefCell<LRUCache<String, Plan>>>,
+    ir_cache: Rc<RefCell<PicoRouterCache>>,
 }
 
 impl RouterRuntime {
@@ -77,8 +104,106 @@ impl RouterRuntime {
     }
 }
 
+pub type PlanCache = LRUCache<String, (Plan, VersionMap)>;
+
+/// Wrapper around default LRU cache, that
+/// checks schema version.
+pub struct PicoRouterCache {
+    inner: PlanCache,
+}
+
+impl PicoRouterCache {
+    pub fn new(capacity: usize) -> Result<Self, SbroadError> {
+        Ok(PicoRouterCache {
+            inner: PlanCache::new(capacity, None)?,
+        })
+    }
+
+    pub fn capacity(&self) -> usize {
+        self.inner.capacity()
+    }
+}
+
+impl Cache<String, Plan> for PicoRouterCache {
+    fn new(capacity: usize, evict_fn: Option<EvictFn<Plan>>) -> Result<Self, SbroadError>
+    where
+        Self: Sized,
+    {
+        let new_fn: Option<EvictFn<(Plan, VersionMap)>> = if let Some(evict_fn) = evict_fn {
+            let new_fn = move |val: &mut (Plan, VersionMap)| -> Result<(), SbroadError> {
+                evict_fn(&mut val.0)
+            };
+            Some(Box::new(new_fn))
+        } else {
+            None
+        };
+        Ok(PicoRouterCache {
+            inner: PlanCache::new(capacity, new_fn)?,
+        })
+    }
+
+    fn get(&mut self, key: &String) -> Result<Option<&Plan>, SbroadError> {
+        let Some((ir, version_map)) = self.inner.get(key)? else {
+            return Ok(None)
+        };
+        // check Plan's tables have up to date schema
+        let node = node::global()
+            .map_err(|e| SbroadError::FailedTo(Action::Get, None, format!("raft node: {}", e)))?;
+        let storage_spaces = &node.storage.spaces;
+        for table_name in ir.relations.tables.keys() {
+            let space_name = normalize_name_for_space_api(table_name);
+            let cached_version = *version_map.get(space_name.as_str()).ok_or_else(|| {
+                SbroadError::NotFound(
+                    Entity::Table,
+                    format!("in version map with name: {}", space_name),
+                )
+            })?;
+            let Some(space_def) = storage_spaces.by_name(space_name.as_str()).map_err(|e|
+                SbroadError::FailedTo(Action::Get, None, format!("space_def: {}", e))
+            )? else {
+                return Ok(None)
+            };
+            // The outdated entry will be replaced when
+            // `put` is called (which is always called
+            // after cache miss).
+            if cached_version != space_def.schema_version {
+                return Ok(None);
+            }
+        }
+        Ok(Some(ir))
+    }
+
+    fn put(&mut self, key: String, value: Plan) -> Result<(), SbroadError> {
+        let node = node::global()
+            .map_err(|e| SbroadError::FailedTo(Action::Get, None, format!("raft node: {}", e)))?;
+        let storage_spaces = &node.storage.spaces;
+        let mut version_map: HashMap<String, u64> =
+            HashMap::with_capacity(value.relations.tables.len());
+        for table_name in value.relations.tables.keys() {
+            let space_name = normalize_name_for_space_api(table_name);
+            let current_version = if let Some(space_def) =
+                storage_spaces.by_name(space_name.as_str()).map_err(|e| {
+                    SbroadError::FailedTo(Action::Get, None, format!("space_def: {}", e))
+                })? {
+                space_def.schema_version
+            } else {
+                return Err(SbroadError::NotFound(
+                    Entity::SpaceMetadata,
+                    format!("for space: {}", space_name),
+                ));
+            };
+            version_map.insert(space_name, current_version);
+        }
+        self.inner.put(key, (value, version_map))
+    }
+
+    fn clear(&mut self) -> Result<(), SbroadError> {
+        self.inner.clear()
+    }
+}
+
 impl QueryCache for RouterRuntime {
-    type Cache = LRUCache<String, Plan>;
+    type Cache = PicoRouterCache;
 
     fn cache(&self) -> &RefCell<Self::Cache> {
         &self.ir_cache
@@ -95,9 +220,17 @@ impl QueryCache for RouterRuntime {
     fn clear_cache(&self) -> Result<(), SbroadError> {
         *self.ir_cache.try_borrow_mut().map_err(|e| {
             SbroadError::FailedTo(Action::Clear, Some(Entity::Cache), format!("{e:?}"))
-        })? = Self::Cache::new(self.cache_capacity()?, None)?;
+        })? = Self::Cache::new(self.cache_capacity()?)?;
         Ok(())
     }
+
+    fn provides_versions(&self) -> bool {
+        true
+    }
+
+    fn get_table_version(&self, space_name: &str) -> Result<u64, SbroadError> {
+        get_table_version(space_name)
+    }
 }
 
 impl Router for RouterRuntime {
@@ -132,7 +265,7 @@ impl Router for RouterRuntime {
         explain_format(&explain)
     }
 
-    fn extract_sharding_keys_from_map<'rec>(
+    fn extract_sharding_key_from_map<'rec>(
         &self,
         space: String,
         args: &'rec HashMap<String, Value>,
@@ -140,15 +273,15 @@ impl Router for RouterRuntime {
         let metadata = self.metadata.try_borrow().map_err(|e| {
             SbroadError::FailedTo(Action::Borrow, Some(Entity::Metadata), format!("{e:?}"))
         })?;
-        sharding_keys_from_map(&*metadata, &space, args)
+        sharding_key_from_map(&*metadata, &space, args)
     }
 
-    fn extract_sharding_keys_from_tuple<'rec>(
+    fn extract_sharding_key_from_tuple<'rec>(
         &self,
         space: String,
         args: &'rec [Value],
     ) -> Result<Vec<&'rec Value>, SbroadError> {
-        sharding_keys_from_tuple(&*self.metadata()?, &space, args)
+        sharding_key_from_tuple(&*self.metadata()?, &space, args)
     }
 }
 
diff --git a/src/sql/storage.rs b/src/sql/storage.rs
index 8e22049585..03033437ae 100644
--- a/src/sql/storage.rs
+++ b/src/sql/storage.rs
@@ -8,20 +8,23 @@ use sbroad::executor::engine::helpers::storage::meta::StorageMetadata;
 use sbroad::executor::engine::helpers::storage::runtime::unprepare;
 use sbroad::executor::engine::helpers::storage::PreparedStmt;
 use sbroad::executor::engine::helpers::vshard::get_random_bucket;
-use sbroad::executor::engine::helpers::{self};
-use sbroad::executor::engine::{QueryCache, Vshard};
+use sbroad::executor::engine::helpers::{self, normalize_name_for_space_api};
+use sbroad::executor::engine::{QueryCache, StorageCache, Vshard};
 use sbroad::executor::ir::{ConnectionType, ExecutionPlan, QueryType};
-use sbroad::executor::lru::{Cache, LRUCache, DEFAULT_CAPACITY};
-use sbroad::executor::protocol::{Binary, RequiredData};
+use sbroad::executor::lru::{Cache, EvictFn, LRUCache, DEFAULT_CAPACITY};
+use sbroad::executor::protocol::{Binary, RequiredData, SchemaInfo};
 use sbroad::ir::value::Value;
 
+use crate::sql::router::{get_table_version, VersionMap};
+use crate::traft::node;
+use std::collections::HashMap;
 use std::{any::Any, cell::RefCell, rc::Rc};
 
 use super::{router::calculate_bucket_id, DEFAULT_BUCKET_COUNT};
 
 thread_local!(
-    static STATEMENT_CACHE: Rc<RefCell<LRUCache<String, PreparedStmt>>> = Rc::new(
-        RefCell::new(LRUCache::new(DEFAULT_CAPACITY, Some(Box::new(unprepare))).unwrap())
+    static STATEMENT_CACHE: Rc<RefCell<PicoStorageCache>> = Rc::new(
+        RefCell::new(PicoStorageCache::new(DEFAULT_CAPACITY, Some(Box::new(unprepare))).unwrap())
     )
 );
 
@@ -29,11 +32,95 @@ thread_local!(
 pub struct StorageRuntime {
     pub metadata: RefCell<StorageMetadata>,
     bucket_count: u64,
-    cache: Rc<RefCell<LRUCache<String, PreparedStmt>>>,
+    cache: Rc<RefCell<PicoStorageCache>>,
+}
+
+pub struct PicoStorageCache(LRUCache<String, (PreparedStmt, VersionMap)>);
+
+impl PicoStorageCache {
+    pub fn new(
+        capacity: usize,
+        evict_fn: Option<EvictFn<PreparedStmt>>,
+    ) -> Result<Self, SbroadError> {
+        let new_fn: Option<EvictFn<(PreparedStmt, VersionMap)>> = if let Some(evict_fn) = evict_fn {
+            let new_fn = move |val: &mut (PreparedStmt, VersionMap)| -> Result<(), SbroadError> {
+                evict_fn(&mut val.0)
+            };
+            Some(Box::new(new_fn))
+        } else {
+            None
+        };
+        Ok(PicoStorageCache(LRUCache::new(capacity, new_fn)?))
+    }
+
+    pub fn capacity(&self) -> usize {
+        self.0.capacity()
+    }
+}
+
+impl StorageCache for PicoStorageCache {
+    fn put(
+        &mut self,
+        plan_id: String,
+        stmt: PreparedStmt,
+        schema_info: &SchemaInfo,
+    ) -> Result<(), SbroadError> {
+        let mut version_map: HashMap<String, u64> =
+            HashMap::with_capacity(schema_info.router_version_map.len());
+        let node = node::global()
+            .map_err(|e| SbroadError::FailedTo(Action::Get, None, format!("raft node: {}", e)))?;
+        let storage_spaces = &node.storage.spaces;
+        for table_name in schema_info.router_version_map.keys() {
+            let space_name = normalize_name_for_space_api(table_name);
+            let current_version = if let Some(space_def) =
+                storage_spaces.by_name(space_name.as_str()).map_err(|e| {
+                    SbroadError::FailedTo(Action::Get, None, format!("space_def: {}", e))
+                })? {
+                space_def.schema_version
+            } else {
+                return Err(SbroadError::NotFound(
+                    Entity::SpaceMetadata,
+                    format!("for space: {}", space_name),
+                ));
+            };
+            version_map.insert(space_name, current_version);
+        }
+
+        self.0.put(plan_id, (stmt, version_map))
+    }
+
+    fn get(&mut self, plan_id: &String) -> Result<Option<&PreparedStmt>, SbroadError> {
+        let Some((ir, version_map)) = self.0.get(plan_id)? else {
+            return Ok(None)
+        };
+        // check Plan's tables have up to date schema
+        let node = node::global()
+            .map_err(|e| SbroadError::FailedTo(Action::Get, None, format!("raft node: {}", e)))?;
+        let storage_spaces = &node.storage.spaces;
+        for (table_name, cached_version) in version_map {
+            let space_name = normalize_name_for_space_api(table_name);
+            let Some(space_def) = storage_spaces.by_name(space_name.as_str()).map_err(|e|
+                SbroadError::FailedTo(Action::Get, None, format!("space_def: {}", e))
+            )? else {
+                return Ok(None)
+            };
+            // The outdated entry will be replaced when
+            // `put` is called (which is always called
+            // after cache miss).
+            if *cached_version != space_def.schema_version {
+                return Ok(None);
+            }
+        }
+        Ok(Some(ir))
+    }
+
+    fn clear(&mut self) -> Result<(), SbroadError> {
+        self.0.clear()
+    }
 }
 
 impl QueryCache for StorageRuntime {
-    type Cache = LRUCache<String, PreparedStmt>;
+    type Cache = PicoStorageCache;
 
     fn cache(&self) -> &RefCell<Self::Cache> {
         &self.cache
@@ -55,6 +142,14 @@ impl QueryCache for StorageRuntime {
         })? = Self::Cache::new(DEFAULT_CAPACITY, None)?;
         Ok(())
     }
+
+    fn provides_versions(&self) -> bool {
+        true
+    }
+
+    fn get_table_version(&self, space_name: &str) -> Result<u64, SbroadError> {
+        get_table_version(space_name)
+    }
 }
 
 impl Vshard for StorageRuntime {
@@ -120,6 +215,15 @@ impl StorageRuntime {
         required: &mut RequiredData,
         raw_optional: &mut Vec<u8>,
     ) -> Result<Box<dyn Any>, SbroadError> {
+        // Check router schema version hasn't changed.
+        for (table, version) in &required.schema_info.router_version_map {
+            let normalized = normalize_name_for_space_api(table);
+            // TODO: if storage version is smaller than router's version
+            // wait until state catches up.
+            if *version != get_table_version(normalized.as_str())? {
+                return Err(SbroadError::OutdatedStorageSchema);
+            }
+        }
         match required.query_type {
             QueryType::DML => helpers::execute_dml(self, required, raw_optional),
             QueryType::DQL => {
-- 
GitLab